Browse Source

[feature] data synchronization function (#1881)

* feature:data synchronization function

* feature:data synchronization function

* feature:data synchronization function

* fix:add license

* fix:resource close bug

* fix:remove class author

* fix:Improve test coverage

* fix:add UT

* fix:add UT

* fix:data sync node renamed to datax

* fix:add UT

* fix: UT bug

* fix:Optimize variable name

* fix:pom

* fix:Variable name error

* fix:optimize the code
pull/2/head
魔方不在手 5 years ago committed by GitHub
parent
commit
f942e5ced3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh
  2. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java
  3. 192
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java
  4. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java
  5. 129
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java
  6. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
  7. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java
  8. 522
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
  9. 108
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java
  10. 352
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java
  11. 4
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js
  12. 3
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss
  13. 10
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue
  14. 292
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue
  15. BIN
      dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png
  16. BIN
      dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png
  17. 10
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  18. 10
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  19. 2
      pom.xml

1
dockerfile/conf/dolphinscheduler/conf/env/.dolphinscheduler_env.sh vendored

@ -18,3 +18,4 @@
export PYTHON_HOME=/usr/bin/python export PYTHON_HOME=/usr/bin/python
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=$PYTHON_HOME:$JAVA_HOME/bin:$PATH export PATH=$PYTHON_HOME:$JAVA_HOME/bin:$PATH
export DATAX_HOME=/opt/datax/bin/datax.py

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java

@ -33,6 +33,7 @@ public enum TaskType {
* 7 DEPENDENT * 7 DEPENDENT
* 8 FLINK * 8 FLINK
* 9 HTTP * 9 HTTP
* 10 DATAX
*/ */
SHELL(0, "shell"), SHELL(0, "shell"),
SQL(1, "sql"), SQL(1, "sql"),
@ -43,7 +44,8 @@ public enum TaskType {
PYTHON(6, "python"), PYTHON(6, "python"),
DEPENDENT(7, "dependent"), DEPENDENT(7, "dependent"),
FLINK(8, "flink"), FLINK(8, "flink"),
HTTP(9, "http"); HTTP(9, "http"),
DATAX(10, "datax");
TaskType(int code, String descp){ TaskType(int code, String descp){
this.code = code; this.code = code;

192
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/datax/DataxParameters.java

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.task.datax;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
/**
* DataX parameter
*/
public class DataxParameters extends AbstractParameters {
/**
* data source typeeg MYSQL, POSTGRES ...
*/
private String dsType;
/**
* datasource id
*/
private int dataSource;
/**
* data target typeeg MYSQL, POSTGRES ...
*/
private String dtType;
/**
* datatarget id
*/
private int dataTarget;
/**
* sql
*/
private String sql;
/**
* target table
*/
private String targetTable;
/**
* Pre Statements
*/
private List<String> preStatements;
/**
* Post Statements
*/
private List<String> postStatements;
/**
* speed byte num
*/
private int jobSpeedByte;
/**
* speed record count
*/
private int jobSpeedRecord;
public String getDsType() {
return dsType;
}
public void setDsType(String dsType) {
this.dsType = dsType;
}
public int getDataSource() {
return dataSource;
}
public void setDataSource(int dataSource) {
this.dataSource = dataSource;
}
public String getDtType() {
return dtType;
}
public void setDtType(String dtType) {
this.dtType = dtType;
}
public int getDataTarget() {
return dataTarget;
}
public void setDataTarget(int dataTarget) {
this.dataTarget = dataTarget;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public String getTargetTable() {
return targetTable;
}
public void setTargetTable(String targetTable) {
this.targetTable = targetTable;
}
public List<String> getPreStatements() {
return preStatements;
}
public void setPreStatements(List<String> preStatements) {
this.preStatements = preStatements;
}
public List<String> getPostStatements() {
return postStatements;
}
public void setPostStatements(List<String> postStatements) {
this.postStatements = postStatements;
}
public int getJobSpeedByte() {
return jobSpeedByte;
}
public void setJobSpeedByte(int jobSpeedByte) {
this.jobSpeedByte = jobSpeedByte;
}
public int getJobSpeedRecord() {
return jobSpeedRecord;
}
public void setJobSpeedRecord(int jobSpeedRecord) {
this.jobSpeedRecord = jobSpeedRecord;
}
@Override
public boolean checkParameters() {
if (!(dataSource != 0
&& dataTarget != 0
&& StringUtils.isNotEmpty(sql)
&& StringUtils.isNotEmpty(targetTable))) {
return false;
}
return true;
}
@Override
public List<String> getResourceFilesList() {
return new ArrayList<>();
}
@Override
public String toString() {
return "DataxParameters{" +
"dsType='" + dsType + '\'' +
", dataSource=" + dataSource +
", dtType='" + dtType + '\'' +
", dataTarget=" + dataTarget +
", sql='" + sql + '\'' +
", targetTable='" + targetTable + '\'' +
", preStatements=" + preStatements +
", postStatements=" + postStatements +
", jobSpeedByte=" + jobSpeedByte +
", jobSpeedRecord=" + jobSpeedRecord +
'}';
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters; import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.http.HttpParameters; import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
@ -68,6 +69,8 @@ public class TaskParametersUtils {
return JSONUtils.parseObject(parameter, FlinkParameters.class); return JSONUtils.parseObject(parameter, FlinkParameters.class);
case HTTP: case HTTP:
return JSONUtils.parseObject(parameter, HttpParameters.class); return JSONUtils.parseObject(parameter, HttpParameters.class);
case DATAX:
return JSONUtils.parseObject(parameter, DataxParameters.class);
default: default:
return null; return null;
} }

129
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/DataxUtils.java

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.common.enums.DbType;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
import com.alibaba.druid.sql.parser.SQLStatementParser;
public class DataxUtils {
public static final String DATAX_READER_PLUGIN_MYSQL = "mysqlreader";
public static final String DATAX_READER_PLUGIN_POSTGRESQL = "postgresqlreader";
public static final String DATAX_READER_PLUGIN_ORACLE = "oraclereader";
public static final String DATAX_READER_PLUGIN_SQLSERVER = "sqlserverreader";
public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter";
public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter";
public static final String DATAX_WRITER_PLUGIN_ORACLE = "oraclewriter";
public static final String DATAX_WRITER_PLUGIN_SQLSERVER = "sqlserverwriter";
public static String getReaderPluginName(DbType dbType) {
switch (dbType) {
case MYSQL:
return DATAX_READER_PLUGIN_MYSQL;
case POSTGRESQL:
return DATAX_READER_PLUGIN_POSTGRESQL;
case ORACLE:
return DATAX_READER_PLUGIN_ORACLE;
case SQLSERVER:
return DATAX_READER_PLUGIN_SQLSERVER;
default:
return null;
}
}
public static String getWriterPluginName(DbType dbType) {
switch (dbType) {
case MYSQL:
return DATAX_WRITER_PLUGIN_MYSQL;
case POSTGRESQL:
return DATAX_WRITER_PLUGIN_POSTGRESQL;
case ORACLE:
return DATAX_WRITER_PLUGIN_ORACLE;
case SQLSERVER:
return DATAX_WRITER_PLUGIN_SQLSERVER;
default:
return null;
}
}
public static SQLStatementParser getSqlStatementParser(DbType dbType, String sql) {
switch (dbType) {
case MYSQL:
return new MySqlStatementParser(sql);
case POSTGRESQL:
return new PGSQLStatementParser(sql);
case ORACLE:
return new OracleStatementParser(sql);
case SQLSERVER:
return new SQLServerStatementParser(sql);
default:
return null;
}
}
public static String[] convertKeywordsColumns(DbType dbType, String[] columns) {
if (columns == null) {
return null;
}
String[] toColumns = new String[columns.length];
for (int i = 0; i < columns.length; i++ ) {
toColumns[i] = doConvertKeywordsColumn(dbType, columns[i]);
}
return toColumns;
}
public static String doConvertKeywordsColumn(DbType dbType, String column) {
if (column == null) {
return column;
}
column = column.trim();
column = column.replace("`", "");
column = column.replace("\"", "");
column = column.replace("'", "");
switch (dbType) {
case MYSQL:
return String.format("`%s`", column);
case POSTGRESQL:
return String.format("\"%s\"", column);
case ORACLE:
return String.format("\"%s\"", column);
case SQLSERVER:
return String.format("`%s`", column);
default:
return column;
}
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java

@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.flink.FlinkParameters; import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters; import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
@ -194,6 +195,9 @@ public abstract class AbstractTask {
case PYTHON: case PYTHON:
paramsClass = PythonParameters.class; paramsClass = PythonParameters.class;
break; break;
case DATAX:
paramsClass = DataxParameters.class;
break;
default: default:
logger.error("not support this task type: {}", taskType); logger.error("not support this task type: {}", taskType);
throw new IllegalArgumentException("not support this task type"); throw new IllegalArgumentException("not support this task type");

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils; import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask; import org.apache.dolphinscheduler.server.worker.task.dependent.DependentTask;
import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask; import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
import org.apache.dolphinscheduler.server.worker.task.http.HttpTask; import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
import org.apache.dolphinscheduler.server.worker.task.mr.MapReduceTask; import org.apache.dolphinscheduler.server.worker.task.mr.MapReduceTask;
@ -65,6 +66,8 @@ public class TaskManager {
return new DependentTask(props, logger); return new DependentTask(props, logger);
case HTTP: case HTTP:
return new HttpTask(props, logger); return new HttpTask(props, logger);
case DATAX:
return new DataxTask(props, logger);
default: default:
logger.error("unsupport task type: {}", taskType); logger.error("unsupport task type: {}", taskType);
throw new IllegalArgumentException("not support task type"); throw new IllegalArgumentException("not support task type");

522
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java

@ -0,0 +1,522 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.datax;
import java.io.File;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.job.db.BaseDataSource;
import org.apache.dolphinscheduler.common.job.db.DataSourceFactory;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.alibaba.fastjson.JSONObject;
/**
* DataX task
*/
public class DataxTask extends AbstractTask {
/**
* python process(datax only supports version 2.7 by default)
*/
private static final String DATAX_PYTHON = "python2.7";
/**
* datax home path
*/
private static final String DATAX_HOME_EVN = "${DATAX_HOME}";
/**
* datax channel count
*/
private static final int DATAX_CHANNEL_COUNT = 1;
/**
* datax parameters
*/
private DataxParameters dataXParameters;
/**
* task dir
*/
private String taskDir;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* process database access
*/
private ProcessDao processDao;
/**
* constructor
*
* @param props
* props
* @param logger
* logger
*/
public DataxTask(TaskProps props, Logger logger) {
super(props, logger);
this.taskDir = props.getTaskDir();
logger.info("task dir : {}", taskDir);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, props.getTaskDir(), props.getTaskAppId(),
props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
props.getTaskTimeout(), logger);
this.processDao = SpringApplicationContext.getBean(ProcessDao.class);
}
/**
* init DataX config
*/
@Override
public void init() {
logger.info("datax task params {}", taskProps.getTaskParams());
dataXParameters = JSONUtils.parseObject(taskProps.getTaskParams(), DataxParameters.class);
if (!dataXParameters.checkParameters()) {
throw new RuntimeException("datax task params is not valid");
}
}
/**
* run DataX process
*
* @throws Exception
*/
@Override
public void handle()
throws Exception {
try {
// set the name of the current thread
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskProps.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
// run datax process
String jsonFilePath = buildDataxJsonFile();
String shellCommandFilePath = buildShellCommandFile(jsonFilePath);
exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, processDao);
}
catch (Exception e) {
exitStatusCode = -1;
throw e;
}
}
/**
* cancel DataX process
*
* @param cancelApplication
* @throws Exception
*/
@Override
public void cancelApplication(boolean cancelApplication)
throws Exception {
// cancel process
shellCommandExecutor.cancelApplication();
}
/**
* build datax configuration file
*
* @return
* @throws Exception
*/
private String buildDataxJsonFile()
throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json", taskDir, taskProps.getTaskAppId());
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
JSONObject job = new JSONObject();
job.put("content", buildDataxJobContentJson());
job.put("setting", buildDataxJobSettingJson());
JSONObject root = new JSONObject();
root.put("job", job);
root.put("core", buildDataxCoreJson());
logger.debug("datax job json : {}", root.toString());
// create datax json file
FileUtils.writeStringToFile(new File(fileName), root.toString(), Charset.forName("UTF-8"));
return fileName;
}
/**
* build datax job config
*
* @return
* @throws SQLException
*/
private List<JSONObject> buildDataxJobContentJson()
throws SQLException {
DataSource dataSource = processDao.findDataSourceById(dataXParameters.getDataSource());
BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
DataSource dataTarget = processDao.findDataSourceById(dataXParameters.getDataTarget());
BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(),
dataTarget.getConnectionParams());
List<JSONObject> readerConnArr = new ArrayList<>();
JSONObject readerConn = new JSONObject();
readerConn.put("querySql", new String[] {dataXParameters.getSql()});
readerConn.put("jdbcUrl", new String[] {dataSourceCfg.getJdbcUrl()});
readerConnArr.add(readerConn);
JSONObject readerParam = new JSONObject();
readerParam.put("username", dataSourceCfg.getUser());
readerParam.put("password", dataSourceCfg.getPassword());
readerParam.put("connection", readerConnArr);
JSONObject reader = new JSONObject();
reader.put("name", DataxUtils.getReaderPluginName(dataSource.getType()));
reader.put("parameter", readerParam);
List<JSONObject> writerConnArr = new ArrayList<>();
JSONObject writerConn = new JSONObject();
writerConn.put("table", new String[] {dataXParameters.getTargetTable()});
writerConn.put("jdbcUrl", dataTargetCfg.getJdbcUrl());
writerConnArr.add(writerConn);
JSONObject writerParam = new JSONObject();
writerParam.put("username", dataTargetCfg.getUser());
writerParam.put("password", dataTargetCfg.getPassword());
writerParam.put("column",
parsingSqlColumnNames(dataSource.getType(), dataTarget.getType(), dataSourceCfg, dataXParameters.getSql()));
writerParam.put("connection", writerConnArr);
if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) {
writerParam.put("preSql", dataXParameters.getPreStatements());
}
if (CollectionUtils.isNotEmpty(dataXParameters.getPostStatements())) {
writerParam.put("postSql", dataXParameters.getPostStatements());
}
JSONObject writer = new JSONObject();
writer.put("name", DataxUtils.getWriterPluginName(dataTarget.getType()));
writer.put("parameter", writerParam);
List<JSONObject> contentList = new ArrayList<>();
JSONObject content = new JSONObject();
content.put("reader", reader);
content.put("writer", writer);
contentList.add(content);
return contentList;
}
/**
* build datax setting config
*
* @return
*/
private JSONObject buildDataxJobSettingJson() {
JSONObject speed = new JSONObject();
speed.put("channel", DATAX_CHANNEL_COUNT);
if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
}
if (dataXParameters.getJobSpeedRecord() > 0) {
speed.put("record", dataXParameters.getJobSpeedRecord());
}
JSONObject errorLimit = new JSONObject();
errorLimit.put("record", 0);
errorLimit.put("percentage", 0);
JSONObject setting = new JSONObject();
setting.put("speed", speed);
setting.put("errorLimit", errorLimit);
return setting;
}
private JSONObject buildDataxCoreJson() {
JSONObject speed = new JSONObject();
speed.put("channel", DATAX_CHANNEL_COUNT);
if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
}
if (dataXParameters.getJobSpeedRecord() > 0) {
speed.put("record", dataXParameters.getJobSpeedRecord());
}
JSONObject channel = new JSONObject();
channel.put("speed", speed);
JSONObject transport = new JSONObject();
transport.put("channel", channel);
JSONObject core = new JSONObject();
core.put("transport", transport);
return core;
}
/**
* create command
*
* @return
* @throws Exception
*/
private String buildShellCommandFile(String jobConfigFilePath)
throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId());
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
// datax python command
StringBuilder sbr = new StringBuilder();
sbr.append(DATAX_PYTHON);
sbr.append(" ");
sbr.append(DATAX_HOME_EVN);
sbr.append(" ");
sbr.append(jobConfigFilePath);
String dataxCommand = sbr.toString();
// find process instance by task id
ProcessInstance processInstance = processDao.findProcessInstanceByTaskId(taskProps.getTaskInstId());
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(), dataXParameters.getLocalParametersMap(),
processInstance.getCmdTypeIfComplement(), processInstance.getScheduleTime());
if (paramsMap != null) {
dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap));
}
logger.debug("raw script : {}", dataxCommand);
// create shell command file
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
Files.createFile(path, attr);
Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND);
return fileName;
}
/**
* parsing synchronized column names in SQL statements
*
* @param dsType
* the database type of the data source
* @param dtType
* the database type of the data target
* @param dataSourceCfg
* the database connection parameters of the data source
* @param sql
* sql for data synchronization
* @return
*/
private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource dataSourceCfg, String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(dsType, sql);
if (columnNames == null || columnNames.length == 0) {
logger.info("try to execute sql analysis query column name");
columnNames = tryExecuteSqlResolveColumnNames(dataSourceCfg, sql);
}
notNull(columnNames, String.format("parsing sql columns failed : %s", sql));
return DataxUtils.convertKeywordsColumns(dtType, columnNames);
}
/**
* try grammatical parsing column
*
* @param dbType
* database type
* @param sql
* sql for data synchronization
* @return column name array
* @throws RuntimeException
*/
private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) {
String[] columnNames;
try {
SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql);
notNull(parser, String.format("database driver [%s] is not support", dbType.toString()));
SQLStatement sqlStatement = parser.parseStatement();
SQLSelectStatement sqlSelectStatement = (SQLSelectStatement)sqlStatement;
SQLSelect sqlSelect = sqlSelectStatement.getSelect();
List<SQLSelectItem> selectItemList = null;
if (sqlSelect.getQuery() instanceof SQLSelectQueryBlock) {
SQLSelectQueryBlock block = (SQLSelectQueryBlock)sqlSelect.getQuery();
selectItemList = block.getSelectList();
} else if (sqlSelect.getQuery() instanceof SQLUnionQuery) {
SQLUnionQuery unionQuery = (SQLUnionQuery)sqlSelect.getQuery();
SQLSelectQueryBlock block = (SQLSelectQueryBlock)unionQuery.getRight();
selectItemList = block.getSelectList();
}
notNull(selectItemList,
String.format("select query type [%s] is not support", sqlSelect.getQuery().toString()));
columnNames = new String[selectItemList.size()];
for (int i = 0; i < selectItemList.size(); i++ ) {
SQLSelectItem item = selectItemList.get(i);
String columnName = null;
if (item.getAlias() != null) {
columnName = item.getAlias();
} else if (item.getExpr() != null) {
if (item.getExpr() instanceof SQLPropertyExpr) {
SQLPropertyExpr expr = (SQLPropertyExpr)item.getExpr();
columnName = expr.getName();
} else if (item.getExpr() instanceof SQLIdentifierExpr) {
SQLIdentifierExpr expr = (SQLIdentifierExpr)item.getExpr();
columnName = expr.getName();
}
} else {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
}
if (columnName == null) {
throw new RuntimeException(
String.format("grammatical analysis sql column [ %s ] failed", item.toString()));
}
columnNames[i] = columnName;
}
}
catch (Exception e) {
logger.warn(e.getMessage(), e);
return null;
}
return columnNames;
}
/**
* try to execute sql to resolve column names
*
* @param baseDataSource
* the database connection parameters
* @param sql
* sql for data synchronization
* @return column name array
*/
public String[] tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource, String sql) {
String[] columnNames;
sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql);
sql = sql.replace(";", "");
try (
Connection connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(), baseDataSource.getUser(),
baseDataSource.getPassword());
PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet resultSet = stmt.executeQuery()) {
ResultSetMetaData md = resultSet.getMetaData();
int num = md.getColumnCount();
columnNames = new String[num];
for (int i = 1; i <= num; i++ ) {
columnNames[i - 1] = md.getColumnName(i);
}
}
catch (SQLException e) {
logger.warn(e.getMessage(), e);
return null;
}
return columnNames;
}
@Override
public AbstractParameters getParameters() {
return dataXParameters;
}
private void notNull(Object obj, String message) {
if (obj == null) {
throw new RuntimeException(message);
}
}
}

108
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/DataxUtilsTest.java

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.dialect.oracle.parser.OracleStatementParser;
import com.alibaba.druid.sql.dialect.postgresql.parser.PGSQLStatementParser;
import com.alibaba.druid.sql.dialect.sqlserver.parser.SQLServerStatementParser;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* DataxUtils Tester.
*/
public class DataxUtilsTest {
/**
*
* Method: getReaderPluginName(DbType dbType)
*
*/
@Test
public void testGetReaderPluginName() {
assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, DataxUtils.getReaderPluginName(DbType.MYSQL));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_POSTGRESQL, DataxUtils.getReaderPluginName(DbType.POSTGRESQL));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_SQLSERVER, DataxUtils.getReaderPluginName(DbType.SQLSERVER));
assertEquals(DataxUtils.DATAX_READER_PLUGIN_ORACLE, DataxUtils.getReaderPluginName(DbType.ORACLE));
assertTrue(DataxUtils.getReaderPluginName(DbType.DB2) == null);
}
/**
*
* Method: getWriterPluginName(DbType dbType)
*
*/
@Test
public void testGetWriterPluginName() {
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, DataxUtils.getWriterPluginName(DbType.MYSQL));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_POSTGRESQL, DataxUtils.getWriterPluginName(DbType.POSTGRESQL));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_SQLSERVER, DataxUtils.getWriterPluginName(DbType.SQLSERVER));
assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_ORACLE, DataxUtils.getWriterPluginName(DbType.ORACLE));
assertTrue(DataxUtils.getWriterPluginName(DbType.DB2) == null);
}
/**
*
* Method: getSqlStatementParser(DbType dbType, String sql)
*
*/
@Test
public void testGetSqlStatementParser() throws Exception {
assertTrue(DataxUtils.getSqlStatementParser(DbType.MYSQL, "select 1") instanceof MySqlStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.POSTGRESQL, "select 1") instanceof PGSQLStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.ORACLE, "select 1") instanceof OracleStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.SQLSERVER, "select 1") instanceof SQLServerStatementParser);
assertTrue(DataxUtils.getSqlStatementParser(DbType.DB2, "select 1") == null);
}
/**
*
* Method: convertKeywordsColumns(DbType dbType, String[] columns)
*
*/
@Test
public void testConvertKeywordsColumns() throws Exception {
String[] fromColumns = new String[]{"`select`", "from", "\"where\"", " table "};
String[] targetColumns = new String[]{"`select`", "`from`", "`where`", "`table`"};
String[] toColumns = DataxUtils.convertKeywordsColumns(DbType.MYSQL, fromColumns);
assertTrue(fromColumns.length == toColumns.length);
for (int i = 0; i < toColumns.length; i++) {
assertEquals(targetColumns[i], toColumns[i]);
}
}
/**
*
* Method: doConvertKeywordsColumn(DbType dbType, String column)
*
*/
@Test
public void testDoConvertKeywordsColumn() throws Exception {
assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.MYSQL, " \"`select`\" "));
assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.POSTGRESQL, " \"`select`\" "));
assertEquals("`select`", DataxUtils.doConvertKeywordsColumn(DbType.SQLSERVER, " \"`select`\" "));
assertEquals("\"select\"", DataxUtils.doConvertKeywordsColumn(DbType.ORACLE, " \"`select`\" "));
assertEquals("select", DataxUtils.doConvertKeywordsColumn(DbType.DB2, " \"`select`\" "));
}
}

352
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTaskTest.java

@ -0,0 +1,352 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.datax;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.job.db.BaseDataSource;
import org.apache.dolphinscheduler.common.job.db.DataSourceFactory;
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
/**
* DataxTask Tester.
*/
public class DataxTaskTest {
private static final Logger logger = LoggerFactory.getLogger(DataxTaskTest.class);
private DataxTask dataxTask;
private ProcessDao processDao;
private ShellCommandExecutor shellCommandExecutor;
private ApplicationContext applicationContext;
@Before
public void before()
throws Exception {
processDao = Mockito.mock(ProcessDao.class);
shellCommandExecutor = Mockito.mock(ShellCommandExecutor.class);
applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessDao.class)).thenReturn(processDao);
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams(
"{\"targetTable\":\"test\",\"postStatements\":[],\"jobSpeedByte\":1024,\"jobSpeedRecord\":1000,\"dtType\":\"MYSQL\",\"datasource\":1,\"dsType\":\"MYSQL\",\"datatarget\":2,\"jobSpeedByte\":0,\"sql\":\"select 1 as test from dual\",\"preStatements\":[\"delete from test\"],\"postStatements\":[\"delete from test\"]}");
dataxTask = PowerMockito.spy(new DataxTask(props, logger));
dataxTask.init();
Mockito.when(processDao.findDataSourceById(1)).thenReturn(getDataSource());
Mockito.when(processDao.findDataSourceById(2)).thenReturn(getDataSource());
Mockito.when(processDao.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
String fileName = String.format("%s/%s_node.sh", props.getTaskDir(), props.getTaskAppId());
Mockito.when(shellCommandExecutor.run(fileName, processDao)).thenReturn(0);
}
private DataSource getDataSource() {
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
dataSource.setConnectionParams(
"{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}");
dataSource.setUserId(1);
return dataSource;
}
private ProcessInstance getProcessInstance() {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setCommandType(CommandType.START_PROCESS);
processInstance.setScheduleTime(new Date());
return processInstance;
}
@After
public void after()
throws Exception {}
/**
* Method: DataxTask()
*/
@Test
public void testDataxTask()
throws Exception {
TaskProps props = new TaskProps();
props.setTaskDir("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1);
props.setTenantCode("1");
Assert.assertNotNull(new DataxTask(props, logger));
}
/**
* Method: init
*/
@Test
public void testInit()
throws Exception {
try {
dataxTask.init();
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: handle()
*/
@Test
public void testHandle()
throws Exception {
try {
dataxTask.handle();
} catch (RuntimeException e) {
if (e.getMessage().indexOf("process error . exitCode is : -1") < 0) {
Assert.fail();
}
}
}
/**
* Method: cancelApplication()
*/
@Test
public void testCancelApplication()
throws Exception {
try {
dataxTask.cancelApplication(true);
} catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: parsingSqlColumnNames(DbType dsType, DbType dtType, BaseDataSource
* dataSourceCfg, String sql)
*/
@Test
public void testParsingSqlColumnNames()
throws Exception {
try {
BaseDataSource dataSource = DataSourceFactory.getDatasource(getDataSource().getType(),
getDataSource().getConnectionParams());
Method method = DataxTask.class.getDeclaredMethod("parsingSqlColumnNames", DbType.class, DbType.class, BaseDataSource.class, String.class);
method.setAccessible(true);
String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, DbType.MYSQL, dataSource, "select 1 as a, 2 as `table` from dual");
Assert.assertNotNull(columns);
Assert.assertTrue(columns.length == 2);
Assert.assertEquals("[`a`, `table`]", Arrays.toString(columns));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: tryGrammaticalParsingSqlColumnNames(DbType dbType, String sql)
*/
@Test
public void testTryGrammaticalAnalysisSqlColumnNames()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("tryGrammaticalAnalysisSqlColumnNames", DbType.class, String.class);
method.setAccessible(true);
String[] columns = (String[]) method.invoke(dataxTask, DbType.MYSQL, "select t1.a, t1.b from test t1 union all select a, t2.b from (select a, b from test) t2");
Assert.assertNotNull(columns);
Assert.assertTrue(columns.length == 2);
Assert.assertEquals("[a, b]", Arrays.toString(columns));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: tryExecuteSqlResolveColumnNames(BaseDataSource baseDataSource,
* String sql)
*/
@Test
public void testTryExecuteSqlResolveColumnNames()
throws Exception {
// TODO: Test goes here...
}
/**
* Method: buildDataxJsonFile()
*/
@Test
public void testBuildDataxJsonFile()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile");
method.setAccessible(true);
String filePath = (String) method.invoke(dataxTask, null);
Assert.assertNotNull(filePath);
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: buildDataxJobContentJson()
*/
@Test
public void testBuildDataxJobContentJson()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJobContentJson");
method.setAccessible(true);
List<JSONObject> contentList = (List<JSONObject>) method.invoke(dataxTask, null);
Assert.assertNotNull(contentList);
JSONObject content = contentList.get(0);
JSONObject reader = (JSONObject) content.get("reader");
Assert.assertNotNull(reader);
String readerPluginName = (String) reader.get("name");
Assert.assertEquals(DataxUtils.DATAX_READER_PLUGIN_MYSQL, readerPluginName);
JSONObject writer = (JSONObject) content.get("writer");
Assert.assertNotNull(writer);
String writerPluginName = (String) writer.get("name");
Assert.assertEquals(DataxUtils.DATAX_WRITER_PLUGIN_MYSQL, writerPluginName);
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: buildDataxJobSettingJson()
*/
@Test
public void testBuildDataxJobSettingJson()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJobSettingJson");
method.setAccessible(true);
JSONObject setting = (JSONObject) method.invoke(dataxTask, null);
Assert.assertNotNull(setting);
Assert.assertNotNull(setting.get("speed"));
Assert.assertNotNull(setting.get("errorLimit"));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: buildDataxCoreJson()
*/
@Test
public void testBuildDataxCoreJson()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildDataxCoreJson");
method.setAccessible(true);
JSONObject coreConfig = (JSONObject) method.invoke(dataxTask, null);
Assert.assertNotNull(coreConfig);
Assert.assertNotNull(coreConfig.get("transport"));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: buildShellCommandFile(String jobConfigFilePath)
*/
@Test
public void testBuildShellCommandFile()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class);
method.setAccessible(true);
Assert.assertNotNull(method.invoke(dataxTask, "test.json"));
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
/**
* Method: getParameters
*/
@Test
public void testGetParameters()
throws Exception {
Assert.assertTrue(dataxTask.getParameters() != null);
}
/**
* Method: notNull(Object obj, String message)
*/
@Test
public void testNotNull()
throws Exception {
try {
Method method = DataxTask.class.getDeclaredMethod("notNull", Object.class, String.class);
method.setAccessible(true);
method.invoke(dataxTask, "abc", "test throw RuntimeException");
}
catch (Exception e) {
Assert.fail(e.getMessage());
}
}
}

4
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js

@ -279,6 +279,10 @@ let tasksType = {
'HTTP': { 'HTTP': {
desc: 'HTTP', desc: 'HTTP',
color: '#E46F13' color: '#E46F13'
},
'DATAX': {
desc: 'DataX',
color: '#1fc747'
} }
} }

3
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss

@ -101,6 +101,9 @@
.icos-HTTP { .icos-HTTP {
background: url("../img/toobar_HTTP.png") no-repeat 50% 50%; background: url("../img/toobar_HTTP.png") no-repeat 50% 50%;
} }
.icos-DATAX {
background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%;
}
.toolbar { .toolbar {
width: 60px; width: 60px;
height: 100%; height: 100%;

10
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue

@ -191,7 +191,13 @@
ref="HTTP" ref="HTTP"
:backfill-item="backfillItem"> :backfill-item="backfillItem">
</m-http> </m-http>
<m-datax
v-if="taskType === 'DATAX'"
@on-params="_onParams"
@on-cache-params="_onCacheParams"
ref="DATAX"
:backfill-item="backfillItem">
</m-datax>
</div> </div>
</div> </div>
<div class="bottom-box"> <div class="bottom-box">
@ -216,6 +222,7 @@
import mProcedure from './tasks/procedure' import mProcedure from './tasks/procedure'
import mDependent from './tasks/dependent' import mDependent from './tasks/dependent'
import mHttp from './tasks/http' import mHttp from './tasks/http'
import mDatax from './tasks/datax'
import mSubProcess from './tasks/sub_process' import mSubProcess from './tasks/sub_process'
import mSelectInput from './_source/selectInput' import mSelectInput from './_source/selectInput'
import mTimeoutAlarm from './_source/timeoutAlarm' import mTimeoutAlarm from './_source/timeoutAlarm'
@ -565,6 +572,7 @@
mPython, mPython,
mDependent, mDependent,
mHttp, mHttp,
mDatax,
mSelectInput, mSelectInput,
mTimeoutAlarm, mTimeoutAlarm,
mPriority, mPriority,

292
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/datax.vue

@ -0,0 +1,292 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
<template>
<div class="datax-model">
<m-list-box>
<div slot="text">{{$t('Datasource')}}</div>
<div slot="content">
<m-datasource
ref="refDs"
@on-dsData="_onDsData"
:supportType="['MYSQL','POSTGRESQL', 'ORACLE', 'SQLSERVER']"
:data="{ type:dsType,datasource:datasource }">
</m-datasource>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('SQL Statement')}}</div>
<div slot="content">
<div class="from-mirror">
<textarea
id="code-sql-mirror"
name="code-sql-mirror"
style="opacity: 0;">
</textarea>
</div>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('TargetDataBase')}}</div>
<div slot="content">
<m-datasource
ref="refDt"
@on-dsData="_onDtData"
:supportType="['MYSQL','POSTGRESQL', 'ORACLE', 'SQLSERVER']"
:data="{ type:dtType,datasource:datatarget }">
</m-datasource>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('TargetTable')}}</div>
<div slot="content">
<x-input
type="input"
v-model="targetTable"
:placeholder="$t('Please enter the table of target')"
autocomplete="off">
</x-input>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('TargetDataBase')}}{{$t('Pre Statement')}}</div>
<div slot="content">
<m-statement-list
ref="refPreStatements"
@on-statement-list="_onPreStatements"
:statement-list="preStatements">
</m-statement-list>
</div>
</m-list-box>
<m-list-box>
<div slot="text">{{$t('TargetDataBase')}}{{$t('Post Statement')}}</div>
<div slot="content">
<m-statement-list
ref="refPostStatements"
@on-statement-list="_onPostStatements"
:statement-list="postStatements">
</m-statement-list>
</div>
</m-list-box>
<m-list-box>
<div slot="text">
<span>{{$t('SpeedByte')}}</span>
</div>
<div slot="content">
<m-select-input v-model="jobSpeedByte" :list="[0,1,10,50,100,512]">
</m-select-input>
<span>({{$t('0 means unlimited by byte')}})</span>
</div>
</m-list-box>
<m-list-box>
<div slot="text">
<span>{{$t('SpeedRecord')}}</span>
</div>
<div slot="content">
<m-select-input v-model="jobSpeedRecord" :list="[0,500,1000,1500,2000,2500,3000]">
</m-select-input>
<span>({{$t('0 means unlimited by count')}})</span>
</div>
</m-list-box>
</div>
</template>
<script>
import _ from 'lodash'
import i18n from '@/module/i18n'
import mListBox from './_source/listBox'
import mDatasource from './_source/datasource'
import mLocalParams from './_source/localParams'
import mStatementList from './_source/statementList'
import disabledState from '@/module/mixin/disabledState'
import mSelectInput from '../_source/selectInput'
import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror'
let editor
export default {
name: 'datax',
data () {
return {
// Data source type
dsType: '',
// data source
datasource: '',
// Data source type
dtType: '',
// data source
datatarget: '',
// Return to the selected data source
rtDatasource: '',
// Return to the selected data target
rtDatatarget: '',
// Sql statement
sql: '',
// target table
targetTable: '',
// Pre statements
preStatements: [],
// Post statements
postStatements: [],
// speed byte
jobSpeedByte: 0,
// speed record
jobSpeedRecord: 1000,
}
},
mixins: [disabledState],
props: {
backfillItem: Object,
createNodeId: Number
},
methods: {
/**
* return data source
*/
_onDsData (o) {
this.dsType = o.type
this.rtDatasource = o.datasource
},
/**
* return data target
*/
_onDtData (o) {
this.dtType = o.type
this.rtDatatarget = o.datasource
},
/**
* return pre statements
*/
_onPreStatements (a) {
this.preStatements = a
},
/**
* return post statements
*/
_onPostStatements (a) {
this.postStatements = a
},
/**
* verification
*/
_verification () {
if (!editor.getValue()) {
this.$message.warning(`${i18n.$t('Please enter a SQL Statement(required)')}`)
return false
}
// datasource Subcomponent verification
if (!this.$refs.refDs._verifDatasource()) {
return false
}
// datasource Subcomponent verification
if (!this.$refs.refDt._verifDatasource()) {
return false
}
if (!this.targetTable) {
this.$message.warning(`${i18n.$t('Please enter a Target Table(required)')}`)
return false
}
// preStatements Subcomponent verification
if (!this.$refs.refPreStatements._verifProp()) {
return false
}
// postStatements Subcomponent verification
if (!this.$refs.refPostStatements._verifProp()) {
return false
}
// storage
this.$emit('on-params', {
dsType: this.dsType,
dataSource: this.rtDatasource,
dtType: this.dtType,
dataTarget: this.rtDatatarget,
sql: editor.getValue(),
targetTable: this.targetTable,
jobSpeedByte: this.jobSpeedByte * 1024,
jobSpeedRecord: this.jobSpeedRecord,
preStatements: this.preStatements,
postStatements: this.postStatements
})
return true
},
/**
* Processing code highlighting
*/
_handlerEditor () {
// editor
editor = codemirror('code-sql-mirror', {
mode: 'sql',
readOnly: this.isDetails
})
this.keypress = () => {
if (!editor.getOption('readOnly')) {
editor.showHint({
completeSingle: false
})
}
}
// Monitor keyboard
editor.on('keypress', this.keypress)
editor.setValue(this.sql)
return editor
}
},
created () {
let o = this.backfillItem
// Non-null objects represent backfill
if (!_.isEmpty(o)) {
// backfill
this.dsType = o.params.dsType || ''
this.datasource = o.params.dataSource || ''
this.dtType = o.params.dtType || ''
this.datatarget = o.params.dataTarget || ''
this.sql = o.params.sql || ''
this.targetTable = o.params.targetTable || ''
this.jobSpeedByte = o.params.jobSpeedByte / 1024 || 0
this.jobSpeedRecord = o.params.jobSpeedRecord || 0
this.preStatements = o.params.preStatements || []
this.postStatements = o.params.postStatements || []
}
},
mounted () {
setTimeout(() => {
this._handlerEditor()
}, 200)
},
destroyed () {
/**
* Destroy the editor instance
*/
if (editor) {
editor.toTextArea() // Uninstall
editor.off($('.code-sql-mirror'), 'keypress', this.keypress)
}
},
computed: {},
components: { mListBox, mDatasource, mLocalParams, mStatementList, mSelectInput }
}
</script>

BIN
dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_DATAX.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 571 B

BIN
dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_ETL.png

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.0 KiB

10
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -509,5 +509,13 @@ export default {
'IP address cannot be empty': 'IP address cannot be empty', 'IP address cannot be empty': 'IP address cannot be empty',
'Please enter the correct IP': 'Please enter the correct IP', 'Please enter the correct IP': 'Please enter the correct IP',
'Please generate token': 'Please generate token', 'Please generate token': 'Please generate token',
'Spark Version': 'Spark Version' 'Spark Version': 'Spark Version',
'TargetDataBase': 'target database',
'TargetTable': 'target table',
'Please enter the table of target': 'Please enter the table of target',
'Please enter a Target Table(required)': 'Please enter a Target Table(required)',
'SpeedByte': 'speed(byte count)',
'SpeedRecord': 'speed(record count)',
'0 means unlimited by byte': '0 means unlimited',
'0 means unlimited by count': '0 means unlimited',
} }

10
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -509,5 +509,13 @@ export default {
'IP address cannot be empty': 'IP地址不能为空', 'IP address cannot be empty': 'IP地址不能为空',
'Please enter the correct IP': '请输入正确的IP', 'Please enter the correct IP': '请输入正确的IP',
'Please generate token': '请生成Token', 'Please generate token': '请生成Token',
'Spark Version': 'Spark版本' 'Spark Version': 'Spark版本',
'TargetDataBase': '目标库',
'TargetTable': '目标表',
'Please enter the table of target': '请输入目标表名',
'Please enter a Target Table(required)': '请输入目标表(必填)',
'SpeedByte': '限流(字节数)',
'SpeedRecord': '限流(记录数)',
'0 means unlimited by byte': 'KB0代表不限制',
'0 means unlimited by count': '0代表不限制',
} }

2
pom.xml

@ -713,6 +713,8 @@
<include>**/dao/mapper/CommandMapperTest.java</include> <include>**/dao/mapper/CommandMapperTest.java</include>
<include>**/alert/template/AlertTemplateFactoryTest.java</include> <include>**/alert/template/AlertTemplateFactoryTest.java</include>
<include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include> <include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
<include>**/server/worker/task/datax/DataxTaskTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
</includes> </includes>
<!-- <skip>true</skip> --> <!-- <skip>true</skip> -->
</configuration> </configuration>

Loading…
Cancel
Save