From f942e5ced397f2f6bf3cef86efd695c43b527ce2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AD=94=E6=96=B9=E4=B8=8D=E5=9C=A8=E6=89=8B?= <39549317+wenhemin@users.noreply.github.com> Date: Thu, 6 Feb 2020 09:29:23 +0800 Subject: [PATCH] [feature] data synchronization function (#1881) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feature:data synchronization function * feature:data synchronization function * feature:data synchronization function * fix:add license * fix:resource close bug * fix:remove class author * fix:Improve test coverage * fix:add UT * fix:add UT * fix:data sync node renamed to datax * fix:add UT * fix: UT bug * fix:Optimize variable name * fix:pom * fix:Variable name error * fix:optimize the code --- .../conf/env/.dolphinscheduler_env.sh | 1 + .../common/enums/TaskType.java | 4 +- .../common/task/datax/DataxParameters.java | 192 +++++++ .../common/utils/TaskParametersUtils.java | 3 + .../server/utils/DataxUtils.java | 129 +++++ .../server/worker/task/AbstractTask.java | 4 + .../server/worker/task/TaskManager.java | 3 + .../server/worker/task/datax/DataxTask.java | 522 ++++++++++++++++++ .../server/utils/DataxUtilsTest.java | 108 ++++ .../worker/task/datax/DataxTaskTest.java | 352 ++++++++++++ .../js/conf/home/pages/dag/_source/config.js | 4 + .../js/conf/home/pages/dag/_source/dag.scss | 3 + .../pages/dag/_source/formModel/formModel.vue | 10 +- .../dag/_source/formModel/tasks/datax.vue | 292 ++++++++++ .../conf/home/pages/dag/img/toolbar_DATAX.png | Bin 0 -> 571 bytes .../conf/home/pages/dag/img/toolbar_ETL.png | Bin 0 -> 3117 bytes .../src/js/module/i18n/locale/en_US.js | 10 +- .../src/js/module/i18n/locale/zh_CN.js | 10 +- pom.xml | 2 + 19 files changed, 1645 insertions(+), 4 deletions(-) create mode 100755 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java create mode 100755 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java create mode 100755 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java create mode 100755 dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue create mode 100644 dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png create mode 100755 dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png diff --git a/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh b/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh index 960d971dd8..8e842fe28e 100644 --- a/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh +++ b/dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh @@ -18,3 +18,4 @@ export PYTHON_HOME=/usr/bin/python export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export PATH=$PYTHON_HOME:$JAVA_HOME/bin:$PATH +export DATAX_HOME=/opt/datax/bin/datax.py \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index 45f36883e3..b996c3aec9 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -33,6 +33,7 @@ public enum TaskType { * 7 DEPENDENT * 8 FLINK * 9 HTTP + * 10 DATAX */ SHELL(0, "shell"), SQL(1, "sql"), @@ -43,7 +44,8 @@ public enum TaskType { PYTHON(6, "python"), DEPENDENT(7, "dependent"), FLINK(8, "flink"), - HTTP(9, "http"); + HTTP(9, "http"), + DATAX(10, "datax"); TaskType(int code, String descp){ this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java new file mode 100755 index 0000000000..95dd505c02 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.datax; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.task.AbstractParameters; + +/** + * DataX parameter + */ +public class DataxParameters extends AbstractParameters { + + /** + * data source type,eg MYSQL, POSTGRES ... + */ + private String dsType; + + /** + * datasource id + */ + private int dataSource; + + /** + * data target type,eg MYSQL, POSTGRES ... + */ + private String dtType; + + /** + * datatarget id + */ + private int dataTarget; + + /** + * sql + */ + private String sql; + + /** + * target table + */ + private String targetTable; + + /** + * Pre Statements + */ + private List preStatements; + + /** + * Post Statements + */ + private List postStatements; + + /** + * speed byte num + */ + private int jobSpeedByte; + + /** + * speed record count + */ + private int jobSpeedRecord; + + public String getDsType() { + return dsType; + } + + public void setDsType(String dsType) { + this.dsType = dsType; + } + + public int getDataSource() { + return dataSource; + } + + public void setDataSource(int dataSource) { + this.dataSource = dataSource; + } + + public String getDtType() { + return dtType; + } + + public void setDtType(String dtType) { + this.dtType = dtType; + } + + public int getDataTarget() { + return dataTarget; + } + + public void setDataTarget(int dataTarget) { + this.dataTarget = dataTarget; + } + + public String getSql() { + return sql; + } + + public void setSql(String sql) { + this.sql = sql; + } + + public String getTargetTable() { + return targetTable; + } + + public void setTargetTable(String targetTable) { + this.targetTable = targetTable; + } + + public List getPreStatements() { + return preStatements; + } + + public void setPreStatements(List preStatements) { + this.preStatements = preStatements; + } + + public List getPostStatements() { + return postStatements; + } + + public void setPostStatements(List postStatements) { + this.postStatements = postStatements; + } + + public int getJobSpeedByte() { + return jobSpeedByte; + } + + public void setJobSpeedByte(int jobSpeedByte) { + this.jobSpeedByte = jobSpeedByte; + } + + public int getJobSpeedRecord() { + return jobSpeedRecord; + } + + public void setJobSpeedRecord(int jobSpeedRecord) { + this.jobSpeedRecord = jobSpeedRecord; + } + + @Override + public boolean checkParameters() { + if (!(dataSource != 0 + && dataTarget != 0 + && StringUtils.isNotEmpty(sql) + && StringUtils.isNotEmpty(targetTable))) { + return false; + } + + return true; + } + + @Override + public List getResourceFilesList() { + return new ArrayList<>(); + } + + @Override + public String toString() { + return "DataxParameters{" + + "dsType='" + dsType + '\'' + + ", dataSource=" + dataSource + + ", dtType='" + dtType + '\'' + + ", dataTarget=" + dataTarget + + ", sql='" + sql + '\'' + + ", targetTable='" + targetTable + '\'' + + ", preStatements=" + preStatements + + ", postStatements=" + postStatements + + ", jobSpeedByte=" + jobSpeedByte + + ", jobSpeedRecord=" + jobSpeedRecord + + '}'; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index 28e2593359..a2ae6a68e3 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.http.HttpParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; @@ -68,6 +69,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, FlinkParameters.class); case HTTP: return JSONUtils.parseObject(parameter, HttpParameters.class); + case DATAX: + return JSONUtils.parseObject(parameter, DataxParameters.class); default: return null; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java new file mode 100755 index 0000000000..930098919b --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + + +import org.apache.dolphinscheduler.common.enums.DbType; + +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser; +import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser; +import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser; +import com.alibaba.druid.sql.parser.SQLStatementParser; + + +public class DataxUtils { + + public static final String DATAX_READER_PLUGIN_MYSQL = "mysqlreader"; + + public static final String DATAX_READER_PLUGIN_POSTGRESQL = "postgresqlreader"; + + public static final String DATAX_READER_PLUGIN_ORACLE = "oraclereader"; + + public static final String DATAX_READER_PLUGIN_SQLSERVER = "sqlserverreader"; + + public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter"; + + public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter"; + + public static final String DATAX_WRITER_PLUGIN_ORACLE = "oraclewriter"; + + public static final String DATAX_WRITER_PLUGIN_SQLSERVER = "sqlserverwriter"; + + public static String getReaderPluginName(DbType dbType) { + switch (dbType) { + case MYSQL: + return DATAX_READER_PLUGIN_MYSQL; + case POSTGRESQL: + return DATAX_READER_PLUGIN_POSTGRESQL; + case ORACLE: + return DATAX_READER_PLUGIN_ORACLE; + case SQLSERVER: + return DATAX_READER_PLUGIN_SQLSERVER; + default: + return null; + } + } + + public static String getWriterPluginName(DbType dbType) { + switch (dbType) { + case MYSQL: + return DATAX_WRITER_PLUGIN_MYSQL; + case POSTGRESQL: + return DATAX_WRITER_PLUGIN_POSTGRESQL; + case ORACLE: + return DATAX_WRITER_PLUGIN_ORACLE; + case SQLSERVER: + return DATAX_WRITER_PLUGIN_SQLSERVER; + default: + return null; + } + } + + public static SQLStatementParser getSqlStatementParser(DbType dbType, String sql) { + switch (dbType) { + case MYSQL: + return new MySqlStatementParser(sql); + case POSTGRESQL: + return new PGSQLStatementParser(sql); + case ORACLE: + return new OracleStatementParser(sql); + case SQLSERVER: + return new SQLServerStatementParser(sql); + default: + return null; + } + } + + public static String[] convertKeywordsColumns(DbType dbType, String[] columns) { + if (columns == null) { + return null; + } + + String[] toColumns = new String[columns.length]; + for (int i = 0; i < columns.length; i++ ) { + toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]); + } + + return toColumns; + } + + public static String doConvertKeywordsColumn(DbType dbType, String column) { + if (column == null) { + return column; + } + + column = column.trim(); + column = column.replace("`", ""); + column = column.replace("\"", ""); + column = column.replace("'", ""); + + switch (dbType) { + case MYSQL: + return String.format("`%s`", column); + case POSTGRESQL: + return String.format("\"%s\"", column); + case ORACLE: + return String.format("\"%s\"", column); + case SQLSERVER: + return String.format("`%s`", column); + default: + return column; + } + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 999863f76e..f2772d0747 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; @@ -194,6 +195,9 @@ public abstract class AbstractTask { case PYTHON: paramsClass = PythonParameters.class; break; + case DATAX: + paramsClass = DataxParameters.class; + break; default: logger.error("not support this task type: {}", taskType); throw new IllegalArgumentException("not support this task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index 310d9cad9a..67deb7a3fa 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask; +import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; import org.apache.dolphinscheduler.server.worker.task.http.HttpTask; import org.apache.dolphinscheduler.server.worker.task.mr.MapReduceTask; @@ -65,6 +66,8 @@ public class TaskManager { return new DependentTask(props, logger); case HTTP: return new HttpTask(props, logger); + case DATAX: + return new DataxTask(props, logger); default: logger.error("unsupport task type: {}", taskType); throw new IllegalArgumentException("not support task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java new file mode 100755 index 0000000000..0de2bbc7c6 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.datax; + + +import java.io.File; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.io.FileUtils; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.job.db.BaseDataSource; +import org.apache.dolphinscheduler.common.job.db.DataSourceFactory; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.datax.DataxParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.utils.DataxUtils; +import org.apache.dolphinscheduler.server.utils.ParamUtils; +import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; +import org.apache.dolphinscheduler.server.worker.task.AbstractTask; +import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.slf4j.Logger; + +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.alibaba.fastjson.JSONObject; + + +/** + * DataX task + */ +public class DataxTask extends AbstractTask { + + /** + * python process(datax only supports version 2.7 by default) + */ + private static final String DATAX_PYTHON = "python2.7"; + + /** + * datax home path + */ + private static final String DATAX_HOME_EVN = "${DATAX_HOME}"; + + /** + * datax channel count + */ + private static final int DATAX_CHANNEL_COUNT = 1; + + /** + * datax parameters + */ + private DataxParameters dataXParameters; + + /** + * task dir + */ + private String taskDir; + + /** + * shell command executor + */ + private ShellCommandExecutor shellCommandExecutor; + + /** + * process database access + */ + private ProcessDao processDao; + + /** + * constructor + * + * @param props + * props + * @param logger + * logger + */ + public DataxTask(TaskProps props, Logger logger) { + super(props, logger); + + this.taskDir = props.getTaskDir(); + logger.info("task dir : {}", taskDir); + + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, props.getTaskDir(), props.getTaskAppId(), + props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(), + props.getTaskTimeout(), logger); + + this.processDao = SpringApplicationContext.getBean(ProcessDao.class); + } + + /** + * init DataX config + */ + @Override + public void init() { + logger.info("datax task params {}", taskProps.getTaskParams()); + dataXParameters = JSONUtils.parseObject(taskProps.getTaskParams(), DataxParameters.class); + + if (!dataXParameters.checkParameters()) { + throw new RuntimeException("datax task params is not valid"); + } + } + + /** + * run DataX process + * + * @throws Exception + */ + @Override + public void handle() + throws Exception { + try { + // set the name of the current thread + String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskProps.getTaskAppId()); + Thread.currentThread().setName(threadLoggerInfoName); + + // run datax process + String jsonFilePath = buildDataxJsonFile(); + String shellCommandFilePath = buildShellCommandFile(jsonFilePath); + exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, processDao); + } + catch (Exception e) { + exitStatusCode = -1; + throw e; + } + } + + /** + * cancel DataX process + * + * @param cancelApplication + * @throws Exception + */ + @Override + public void cancelApplication(boolean cancelApplication) + throws Exception { + // cancel process + shellCommandExecutor.cancelApplication(); + } + + /** + * build datax configuration file + * + * @return + * @throws Exception + */ + private String buildDataxJsonFile() + throws Exception { + // generate json + String fileName = String.format("%s/%s_job.json", taskDir, taskProps.getTaskAppId()); + + Path path = new File(fileName).toPath(); + if (Files.exists(path)) { + return fileName; + } + + JSONObject job = new JSONObject(); + job.put("content", buildDataxJobContentJson()); + job.put("setting", buildDataxJobSettingJson()); + + JSONObject root = new JSONObject(); + root.put("job", job); + root.put("core", buildDataxCoreJson()); + + logger.debug("datax job json : {}", root.toString()); + + // create datax json file + FileUtils.writeStringToFile(new File(fileName), root.toString(), Charset.forName("UTF-8")); + return fileName; + } + + /** + * build datax job config + * + * @return + * @throws SQLException + */ + private List buildDataxJobContentJson() + throws SQLException { + DataSource dataSource = processDao.findDataSourceById(dataXParameters.getDataSource()); + BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(), + dataSource.getConnectionParams()); + + DataSource dataTarget = processDao.findDataSourceById(dataXParameters.getDataTarget()); + BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(), + dataTarget.getConnectionParams()); + + List readerConnArr = new ArrayList<>(); + JSONObject readerConn = new JSONObject(); + readerConn.put("querySql", new String[] {dataXParameters.getSql()}); + readerConn.put("jdbcUrl", new String[] {dataSourceCfg.getJdbcUrl()}); + readerConnArr.add(readerConn); + + JSONObject readerParam = new JSONObject(); + readerParam.put("username", dataSourceCfg.getUser()); + readerParam.put("password", dataSourceCfg.getPassword()); + readerParam.put("connection", readerConnArr); + + JSONObject reader = new JSONObject(); + reader.put("name", DataxUtils.getReaderPluginName(dataSource.getType())); + reader.put("parameter", readerParam); + + List writerConnArr = new ArrayList<>(); + JSONObject writerConn = new JSONObject(); + writerConn.put("table", new String[] {dataXParameters.getTargetTable()}); + writerConn.put("jdbcUrl", dataTargetCfg.getJdbcUrl()); + writerConnArr.add(writerConn); + + JSONObject writerParam = new JSONObject(); + writerParam.put("username", dataTargetCfg.getUser()); + writerParam.put("password", dataTargetCfg.getPassword()); + writerParam.put("column", + parsingSqlColumnNames(dataSource.getType(), dataTarget.getType(), dataSourceCfg, dataXParameters.getSql())); + writerParam.put("connection", writerConnArr); + + if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) { + writerParam.put("preSql", dataXParameters.getPreStatements()); + } + + if (CollectionUtils.isNotEmpty(dataXParameters.getPostStatements())) { + writerParam.put("postSql", dataXParameters.getPostStatements()); + } + + JSONObject writer = new JSONObject(); + writer.put("name", DataxUtils.getWriterPluginName(dataTarget.getType())); + writer.put("parameter", writerParam); + + List contentList = new ArrayList<>(); + JSONObject content = new JSONObject(); + content.put("reader", reader); + content.put("writer", writer); + contentList.add(content); + + return contentList; + } + + /** + * build datax setting config + * + * @return + */ + private JSONObject buildDataxJobSettingJson() { + JSONObject speed = new JSONObject(); + speed.put("channel", DATAX_CHANNEL_COUNT); + + if (dataXParameters.getJobSpeedByte() > 0) { + speed.put("byte", dataXParameters.getJobSpeedByte()); + } + + if (dataXParameters.getJobSpeedRecord() > 0) { + speed.put("record", dataXParameters.getJobSpeedRecord()); + } + + JSONObject errorLimit = new JSONObject(); + errorLimit.put("record", 0); + errorLimit.put("percentage", 0); + + JSONObject setting = new JSONObject(); + setting.put("speed", speed); + setting.put("errorLimit", errorLimit); + + return setting; + } + + private JSONObject buildDataxCoreJson() { + JSONObject speed = new JSONObject(); + speed.put("channel", DATAX_CHANNEL_COUNT); + + if (dataXParameters.getJobSpeedByte() > 0) { + speed.put("byte", dataXParameters.getJobSpeedByte()); + } + + if (dataXParameters.getJobSpeedRecord() > 0) { + speed.put("record", dataXParameters.getJobSpeedRecord()); + } + + JSONObject channel = new JSONObject(); + channel.put("speed", speed); + + JSONObject transport = new JSONObject(); + transport.put("channel", channel); + + JSONObject core = new JSONObject(); + core.put("transport", transport); + + return core; + } + + /** + * create command + * + * @return + * @throws Exception + */ + private String buildShellCommandFile(String jobConfigFilePath) + throws Exception { + // generate scripts + String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId()); + Path path = new File(fileName).toPath(); + + if (Files.exists(path)) { + return fileName; + } + + // datax python command + StringBuilder sbr = new StringBuilder(); + sbr.append(DATAX_PYTHON); + sbr.append(" "); + sbr.append(DATAX_HOME_EVN); + sbr.append(" "); + sbr.append(jobConfigFilePath); + String dataxCommand = sbr.toString(); + + // find process instance by task id + ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId()); + + // combining local and global parameters + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), dataXParameters.getLocalParametersMap(), + processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime()); + if (paramsMap != null) { + dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap)); + } + + logger.debug("raw script : {}", dataxCommand); + + // create shell command file + Set perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X); + FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); + Files.createFile(path, attr); + Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND); + + return fileName; + } + + /** + * parsing synchronized column names in SQL statements + * + * @param dsType + * the database type of the data source + * @param dtType + * the database type of the data target + * @param dataSourceCfg + * the database connection parameters of the data source + * @param sql + * sql for data synchronization + * @return + */ + private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) { + String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(dsType, sql); + + if (columnNames == null || columnNames.length == 0) { + logger.info("try to execute sql analysis query column name"); + columnNames = tryExecuteSqlResolveColumnNames(dataSourceCfg, sql); + } + + notNull(columnNames, String.format("parsing sql columns failed : %s", sql)); + + return DataxUtils.convertKeywordsColumns(dtType, columnNames); + } + + /** + * try grammatical parsing column + * + * @param dbType + * database type + * @param sql + * sql for data synchronization + * @return column name array + * @throws RuntimeException + */ + private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) { + String[] columnNames; + + try { + SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql); + notNull(parser, String.format("database driver [%s] is not support", dbType.toString())); + + SQLStatement sqlStatement = parser.parseStatement(); + SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement; + SQLSelect sqlSelect = sqlSelectStatement.getSelect(); + + List selectItemList = null; + if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) { + SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery(); + selectItemList = block.getSelectList(); + } else if (sqlSelect.getQuery() instanceof SQLUnionQuery) { + SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery(); + SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight(); + selectItemList = block.getSelectList(); + } + + notNull(selectItemList, + String.format("select query type [%s] is not support", sqlSelect.getQuery().toString())); + + columnNames = new String[selectItemList.size()]; + for (int i = 0; i < selectItemList.size(); i++ ) { + SQLSelectItem item = selectItemList.get(i); + + String columnName = null; + + if (item.getAlias() != null) { + columnName = item.getAlias(); + } else if (item.getExpr() != null) { + if (item.getExpr() instanceof SQLPropertyExpr) { + SQLPropertyExpr expr = (SQLPropertyExpr)item.getExpr(); + columnName = expr.getName(); + } else if (item.getExpr() instanceof SQLIdentifierExpr) { + SQLIdentifierExpr expr = (SQLIdentifierExpr)item.getExpr(); + columnName = expr.getName(); + } + } else { + throw new RuntimeException( + String.format("grammatical analysis sql column [ %s ] failed", item.toString())); + } + + if (columnName == null) { + throw new RuntimeException( + String.format("grammatical analysis sql column [ %s ] failed", item.toString())); + } + + columnNames[i] = columnName; + } + } + catch (Exception e) { + logger.warn(e.getMessage(), e); + return null; + } + + return columnNames; + } + + /** + * try to execute sql to resolve column names + * + * @param baseDataSource + * the database connection parameters + * @param sql + * sql for data synchronization + * @return column name array + */ + public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) { + String[] columnNames; + sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql); + sql = sql.replace(";", ""); + + try ( + Connection connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), baseDataSource.getUser(), + baseDataSource.getPassword()); + PreparedStatement stmt = connection.prepareStatement(sql); + ResultSet resultSet = stmt.executeQuery()) { + + ResultSetMetaData md = resultSet.getMetaData(); + int num = md.getColumnCount(); + columnNames = new String[num]; + for (int i = 1; i <= num; i++ ) { + columnNames[i - 1] = md.getColumnName(i); + } + } + catch (SQLException e) { + logger.warn(e.getMessage(), e); + return null; + } + + return columnNames; + } + + @Override + public AbstractParameters getParameters() { + return dataXParameters; + } + + private void notNull(Object obj, String message) { + if (obj == null) { + throw new RuntimeException(message); + } + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java new file mode 100644 index 0000000000..2720bb8a28 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.utils; + +import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser; +import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser; +import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.junit.Assert; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * DataxUtils Tester. + */ +public class DataxUtilsTest { + + /** + * + * Method: getReaderPluginName(DbType dbType) + * + */ + @Test + public void testGetReaderPluginName() { + assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, DataxUtils.getReaderPluginName(DbType.MYSQL)); + assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL, DataxUtils.getReaderPluginName(DbType.POSTGRESQL)); + assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER, DataxUtils.getReaderPluginName(DbType.SQLSERVER)); + assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE, DataxUtils.getReaderPluginName(DbType.ORACLE)); + assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null); + } + + /** + * + * Method: getWriterPluginName(DbType dbType) + * + */ + @Test + public void testGetWriterPluginName() { + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, DataxUtils.getWriterPluginName(DbType.MYSQL)); + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL, DataxUtils.getWriterPluginName(DbType.POSTGRESQL)); + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER, DataxUtils.getWriterPluginName(DbType.SQLSERVER)); + assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE, DataxUtils.getWriterPluginName(DbType.ORACLE)); + assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null); + } + + /** + * + * Method: getSqlStatementParser(DbType dbType, String sql) + * + */ + @Test + public void testGetSqlStatementParser() throws Exception { + assertTrue(DataxUtils.getSqlStatementParser(DbType.MYSQL, "select 1") instanceof MySqlStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.POSTGRESQL, "select 1") instanceof PGSQLStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.ORACLE, "select 1") instanceof OracleStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.SQLSERVER, "select 1") instanceof SQLServerStatementParser); + assertTrue(DataxUtils.getSqlStatementParser(DbType.DB2, "select 1") == null); + } + + /** + * + * Method: convertKeywordsColumns(DbType dbType, String[] columns) + * + */ + @Test + public void testConvertKeywordsColumns() throws Exception { + String[] fromColumns = new String[]{"`select`", "from", "\"where\"", " table "}; + String[] targetColumns = new String[]{"`select`", "`from`", "`where`", "`table`"}; + + String[] toColumns = DataxUtils.convertKeywordsColumns(DbType.MYSQL, fromColumns); + + assertTrue(fromColumns.length == toColumns.length); + + for (int i = 0; i < toColumns.length; i++) { + assertEquals(targetColumns[i], toColumns[i]); + } + } + + /** + * + * Method: doConvertKeywordsColumn(DbType dbType, String column) + * + */ + @Test + public void testDoConvertKeywordsColumn() throws Exception { + assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" ")); + assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" ")); + assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" ")); + assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" ")); + assertEquals("select", DataxUtils.doConvertKeywordsColumn(DbType.DB2, " \"`select`\" ")); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java new file mode 100644 index 0000000000..7a6073e05d --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.datax; + + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Date; +import java.util.List; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.job.db.BaseDataSource; +import org.apache.dolphinscheduler.common.job.db.DataSourceFactory; +import org.apache.dolphinscheduler.common.utils.SpringApplicationContext; +import org.apache.dolphinscheduler.dao.ProcessDao; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.server.utils.DataxUtils; +import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +/** + * DataxTask Tester. + */ +public class DataxTaskTest { + + private static final Logger logger = LoggerFactory.getLogger(DataxTaskTest.class); + + private DataxTask dataxTask; + + private ProcessDao processDao; + + private ShellCommandExecutor shellCommandExecutor; + + private ApplicationContext applicationContext; + + @Before + public void before() + throws Exception { + processDao = Mockito.mock(ProcessDao.class); + shellCommandExecutor = Mockito.mock(ShellCommandExecutor.class); + + applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessDao.class)).thenReturn(processDao); + + TaskProps props = new TaskProps(); + props.setTaskDir("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstId(1); + props.setTenantCode("1"); + props.setEnvFile(".dolphinscheduler_env.sh"); + props.setTaskStartTime(new Date()); + props.setTaskTimeout(0); + props.setTaskParams( + "{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}"); + dataxTask = PowerMockito.spy(new DataxTask(props, logger)); + dataxTask.init(); + + Mockito.when(processDao.findDataSourceById(1)).thenReturn(getDataSource()); + Mockito.when(processDao.findDataSourceById(2)).thenReturn(getDataSource()); + Mockito.when(processDao.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); + + String fileName = String.format("%s/%s_node.sh", props.getTaskDir(), props.getTaskAppId()); + Mockito.when(shellCommandExecutor.run(fileName, processDao)).thenReturn(0); + } + + private DataSource getDataSource() { + DataSource dataSource = new DataSource(); + dataSource.setType(DbType.MYSQL); + dataSource.setConnectionParams( + "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}"); + dataSource.setUserId(1); + return dataSource; + } + + private ProcessInstance getProcessInstance() { + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setCommandType(CommandType.START_PROCESS); + processInstance.setScheduleTime(new Date()); + return processInstance; + } + + @After + public void after() + throws Exception {} + + /** + * Method: DataxTask() + */ + @Test + public void testDataxTask() + throws Exception { + TaskProps props = new TaskProps(); + props.setTaskDir("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstId(1); + props.setTenantCode("1"); + Assert.assertNotNull(new DataxTask(props, logger)); + } + + /** + * Method: init + */ + @Test + public void testInit() + throws Exception { + try { + dataxTask.init(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: handle() + */ + @Test + public void testHandle() + throws Exception { + try { + dataxTask.handle(); + } catch (RuntimeException e) { + if (e.getMessage().indexOf("process error . exitCode is : -1") < 0) { + Assert.fail(); + } + } + } + + /** + * Method: cancelApplication() + */ + @Test + public void testCancelApplication() + throws Exception { + try { + dataxTask.cancelApplication(true); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource + * dataSourceCfg, String sql) + */ + @Test + public void testParsingSqlColumnNames() + throws Exception { + try { + BaseDataSource dataSource = DataSourceFactory.getDatasource(getDataSource().getType(), + getDataSource().getConnectionParams()); + + Method method = DataxTask.class.getDeclaredMethod("parsingSqlColumnNames", DbType.class, DbType.class, BaseDataSource.class, String.class); + method.setAccessible(true); + String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, DbType.MYSQL, dataSource, "select 1 as a, 2 as `table` from dual"); + + Assert.assertNotNull(columns); + + Assert.assertTrue(columns.length == 2); + + Assert.assertEquals("[`a`, `table`]", Arrays.toString(columns)); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: tryGrammaticalParsingSqlColumnNames(DbType dbType, String sql) + */ + @Test + public void testTryGrammaticalAnalysisSqlColumnNames() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("tryGrammaticalAnalysisSqlColumnNames", DbType.class, String.class); + method.setAccessible(true); + String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, "select t1.a, t1.b from test t1 union all select a, t2.b from (select a, b from test) t2"); + + Assert.assertNotNull(columns); + + Assert.assertTrue(columns.length == 2); + + Assert.assertEquals("[a, b]", Arrays.toString(columns)); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, + * String sql) + */ + @Test + public void testTryExecuteSqlResolveColumnNames() + throws Exception { + // TODO: Test goes here... + } + + /** + * Method: buildDataxJsonFile() + */ + @Test + public void testBuildDataxJsonFile() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); + method.setAccessible(true); + String filePath = (String) method.invoke(dataxTask, null); + Assert.assertNotNull(filePath); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildDataxJobContentJson() + */ + @Test + public void testBuildDataxJobContentJson() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJobContentJson"); + method.setAccessible(true); + List contentList = (List) method.invoke(dataxTask, null); + Assert.assertNotNull(contentList); + + JSONObject content = contentList.get(0); + JSONObject reader = (JSONObject) content.get("reader"); + Assert.assertNotNull(reader); + + String readerPluginName = (String) reader.get("name"); + Assert.assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, readerPluginName); + + JSONObject writer = (JSONObject) content.get("writer"); + Assert.assertNotNull(writer); + + String writerPluginName = (String) writer.get("name"); + Assert.assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, writerPluginName); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildDataxJobSettingJson() + */ + @Test + public void testBuildDataxJobSettingJson() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxJobSettingJson"); + method.setAccessible(true); + JSONObject setting = (JSONObject) method.invoke(dataxTask, null); + Assert.assertNotNull(setting); + Assert.assertNotNull(setting.get("speed")); + Assert.assertNotNull(setting.get("errorLimit")); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildDataxCoreJson() + */ + @Test + public void testBuildDataxCoreJson() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildDataxCoreJson"); + method.setAccessible(true); + JSONObject coreConfig = (JSONObject) method.invoke(dataxTask, null); + Assert.assertNotNull(coreConfig); + Assert.assertNotNull(coreConfig.get("transport")); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: buildShellCommandFile(String jobConfigFilePath) + */ + @Test + public void testBuildShellCommandFile() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class); + method.setAccessible(true); + Assert.assertNotNull(method.invoke(dataxTask, "test.json")); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Method: getParameters + */ + @Test + public void testGetParameters() + throws Exception { + Assert.assertTrue(dataxTask.getParameters() != null); + } + + /** + * Method: notNull(Object obj, String message) + */ + @Test + public void testNotNull() + throws Exception { + try { + Method method = DataxTask.class.getDeclaredMethod("notNull", Object.class, String.class); + method.setAccessible(true); + method.invoke(dataxTask, "abc", "test throw RuntimeException"); + } + catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js index 62ca8103e5..e8187043bf 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -279,6 +279,10 @@ let tasksType = { 'HTTP': { desc: 'HTTP', color: '#E46F13' + }, + 'DATAX': { + desc: 'DataX', + color: '#1fc747' } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss index fbb4f418d0..420bae8c89 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -101,6 +101,9 @@ .icos-HTTP { background: url("../img/toobar_HTTP.png") no-repeat 50% 50%; } + .icos-DATAX { + background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%; + } .toolbar { width: 60px; height: 100%; diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index 3f009eb75c..682dd5b51a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -191,7 +191,13 @@ ref="HTTP" :backfill-item="backfillItem"> - + +
@@ -216,6 +222,7 @@ import mProcedure from './tasks/procedure' import mDependent from './tasks/dependent' import mHttp from './tasks/http' + import mDatax from './tasks/datax' import mSubProcess from './tasks/sub_process' import mSelectInput from './_source/selectInput' import mTimeoutAlarm from './_source/timeoutAlarm' @@ -565,6 +572,7 @@ mPython, mDependent, mHttp, + mDatax, mSelectInput, mTimeoutAlarm, mPriority, diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue new file mode 100755 index 0000000000..ce918f49cf --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue @@ -0,0 +1,292 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png new file mode 100644 index 0000000000000000000000000000000000000000..b59c7592081977f8ed36359df9a4467ce9d22f66 GIT binary patch literal 571 zcmeAS@N?(olHy`uVBq!ia0vp^3LwnF3?v&v(vJfvmUKs7M+SzC{oH>NS%G}U;vjb? zhIQv;UIIC)C9V-ADTyViR=N2pnQ4^_Mg~R(x&}tN1|}f}7FGu4R)&_^1_o9J1~w}t z6HzoI7o`IA;nv{$W#L_*2CV>}5LaXO^X>&cW?mP73=r~wk!TF}^OK^dRReYLl?3?( z|7YO#eSPQW@#1#%sVnC^-mq&T(DlZiE{-7@=aU6km=tddv9WzW-KAj0+;RUD1Djx1 z0HZLQMnM~ghmnfIj~pK1gG}r<60{^L9GE6?*f}3^SQKZS$uv&Wh8ss2c{p?CY>HVe*%ju&6gy?ZMurTA)f*m*v@Kxm zz9Fuc9u^atwB%sI6^UaHwj8)ny*1#O)8h+M+t~TIHaD_xMTm)+uo-Od&Ec5D!rh`1 zqLZQHkr8uaL!e~F$0pY^Nxcq?c{2~Rg*vo4YOBngrmr*O#yQ2y7OtBLn4G63Ft<-g zbexi~bHT%$!evi+!kr`}yds4oP6;21jAe@0E6Ee^_t2RITzu9XY;CCm%f1^jaVT>b zGDm)3Xgc3)yQm>>Cd0<-lHn=MlbNHfR4f@7Vv@a1Z#T7i1PoXPPgg&ebxsLQ0Dm98 A_y7O^ literal 0 HcmV?d00001 diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png new file mode 100755 index 0000000000000000000000000000000000000000..913af420c6c76ed051ecf44f27b522419fea7ecc GIT binary patch literal 3117 zcmZ{mc{J2*8^?c%Axj}4*&_-wW8agVvBZ!qWf{Xv%#a!To{+L+k0N3SHCY;iGK6?! ziwY&?5pT*e*^T`jo%fu#_c`Z1zjNK!`F+2i>wDjSUdcArrd;eo>;M38nVT8G8D0Fa z*g%XE7aqRGXv{wPR{8+&nfYto7p6lKVFtGXfbeqw5PKT{wi#0F4*&>)0>GR*0BAh` z0D*u<&9*uW1LSF8Y6Kh{e#I>nSqz00Z*FDGI(_6A6Ps$>ExlC$I96(Iq;K!kQJ5Q% zBJwG!ThGUvwiC5U>z!1OzArScq0w+2ZD938F#gmf+YA>W(u_n#88(}{#L1rRJS_eT zjDV>c&NM=s!_Ckv=a0F=S?-M3bYDLzG4|^E2J+YZFiZfY?LG5fTPH$+_Ji~8frun4>sN9;?U}N5jEP}O|=+Z>%+X&-BR<3ewJozT}H)RfV08q>d8j9eN4FoqY>^C%ntci|FrZXE-h& z(9Hu23w7{6;x<{AIM+{}EfNQg2 zxH5Z0JjY<3!?Rj$6jqXLWS*SOOW;dH3k^B>L3vXrcfVO2tar(5EI!=Yn(9aq(GCxa zP2Z5zx&kHaz@QB5-OUWRQ6GpFjS@RG~2*DAydGJkWH_SB76~dImw<-hiVW# zEwbnA6XK}K&8c{EZYE53IxyDqT(6t%c1ROtpEB6(S-UjHzFt+ck}=%9sjEZnZjpWc zDWGLlf*_gI9|`3>Cf0cHw}4_N;S~iWo&(G5_XeF@rr+V7y^#9J1{DQfcOg42?t~VF zBtMS^O|YdHRRzC; zfLs?@yLF@^VZ@;laC~LTLGE`|nHtrfrrrcy?fnZ`BH`C1` z&`Sr@M8gxZ%gLgo*@!Y;#d9(;`T2!8`IaWmxj{hyfzRO1$V%X3#CE=qrdlwQv~=7h zpjS!J>ZpgSEa&r2VG*#(`;_#M0f{M_`+?{Lx7XVibL-(9b$z*Er`rk?#z?*b#U0A^ zI7Ud(V+x(O!uoO@VkdZeWZk`BBw^a_v34TKpr=93mD+)6VpGAr)v>etz<19$v&ZY{ zXFp~zc}gg@h=v6eb#x@tUhh*wi@N(3PUo%|^Db+Awk*J3FqY!&Qc);ds})HKZ!sr1 zl)5~BlHFcs>7tMaZ?ZI1Gra5w#;+7ooht3;<^X@n);XA5&}d=E&FLatVeN}18p2&r z=7wpga!8`x^huS6TwfhH265vXT(`}nvZU?s?*6Wqn$a1rT zUK|eyjuxeEhHd3IHto=;XEQ7V7shT1k%W5e6TOto)gEF-@A-*YCZgT*`5UU8Riu4do$nQecve~rgdlcVKC z+vi#0qd4CeE&LL`f)rvp=z2`4 zfm)V0+n7~F)UBg1-?_A&NOW=a0!H-ZTq>M46UDbr=G+|;qgwb75mI@*6ZP4HSIg11 zU`Hn~UoGaLjjW}-=-DF6%mJ?&WJ*kohme9Q+;C#KlFX+~-mrW2MSiAw7Ob4vP`z`t zcK`90`HSCcexC5y)*JD5_w-~+bs$eRyUB5VsK*&SQPiOlY~<@|8qIodF1ZT0!LI9x zNV(M=Bk(hw z&L%O%wdw9`L`k}jAGff$=j0HJk~h&2mB}!>Zzcu3b@TduP&}`q%TFvwoh$H1(Y44e zJMFQi%7-OEXzZk>r7?>}aC!1H%p&PY@7z$*@zpNQ6uls`7TW>Sq2 zo7g%L&3AHg_SL7d;Sb8qWrYn2Y@;;=ibWT_W{!63_C8+t-o`cmqu+>)W!z3i^2ob} z?>;BhtVCkn%X+leD{RsRdt^qjakD$J(zBRxu<#je9=R*Nel0XDZ1c~K?03@d{-q=* zC--b7uX0NtU!Lw>l^4%yGNze5Lt19KET-EbX zW>XoQdquz1KAgZL$?Q6%#z*X!nvFJ)FM3dq*{PR%+g_gob7of=LS@@)St;d63a7b0 zIH%pt;pLT6d$&yHl)LYMrxd1UuRvZq(atrCKc!6K^hY{e=gqAcS2Ty$;ydtz{QM~d znKnB!?*Bpkee1We4UVGufkvXXxVQ)#^D{FQ1IY}j!hzwOg zm1OTca2Krk_isI!|hD4Q%imOnaO^lAnyaoNk%eSN#6zU5jaYWjqy_G)_5MG^jzgo3J6>n&UbEe%XOz@}nGWkUBWokP8r z+H+e4+HQi$;A5seffuQ^`NC;!U2=w>IL$JsY92)=oAB^**/dao/mapper/CommandMapperTest.java **/alert/template/AlertTemplateFactoryTest.java **/alert/template/impl/DefaultHTMLTemplateTest.java + **/server/worker/task/datax/DataxTaskTest.java + **/server/utils/DataxUtilsTest.java