@ -17,29 +17,16 @@
package org.apache.dolphinscheduler.server.worker.task.datax ;
import java.io.File ;
import java.nio.charset.StandardCharsets ;
import java.nio.file.Files ;
import java.nio.file.Path ;
import java.nio.file.StandardOpenOption ;
import java.nio.file.attribute.FileAttribute ;
import java.nio.file.attribute.PosixFilePermission ;
import java.nio.file.attribute.PosixFilePermissions ;
import java.sql.Connection ;
import java.sql.DriverManager ;
import java.sql.PreparedStatement ;
import java.sql.ResultSet ;
import java.sql.ResultSetMetaData ;
import java.sql.SQLException ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
import com.alibaba.druid.sql.ast.SQLStatement ;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr ;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr ;
import com.alibaba.druid.sql.ast.statement.* ;
import com.alibaba.druid.sql.parser.SQLStatementParser ;
import com.fasterxml.jackson.databind.node.ArrayNode ;
import com.fasterxml.jackson.databind.node.ObjectNode ;
import org.apache.commons.io.FileUtils ;
import org.apache.dolphinscheduler.common.Constants ;
import org.apache.dolphinscheduler.common.enums.CommandType ;
import org.apache.dolphinscheduler.common.enums.DataType ;
import org.apache.dolphinscheduler.common.enums.DbType ;
import org.apache.dolphinscheduler.common.process.Property ;
import org.apache.dolphinscheduler.common.task.AbstractParameters ;
@ -50,8 +37,6 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils ;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource ;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory ;
import org.apache.dolphinscheduler.dao.entity.DataSource ;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance ;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext ;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext ;
import org.apache.dolphinscheduler.server.utils.DataxUtils ;
@ -59,20 +44,21 @@ import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask ;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult ;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor ;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext ;
import org.apache.dolphinscheduler.service.process.ProcessService ;
import org.slf4j.Logger ;
import com.alibaba.druid.sql.ast.SQLStatement ;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr ;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr ;
import com.alibaba.druid.sql.ast.statement.SQLSelect ;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem ;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock ;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement ;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery ;
import com.alibaba.druid.sql.parser.SQLStatementParser ;
import com.alibaba.fastjson.JSONObject ;
import java.io.File ;
import java.nio.charset.StandardCharsets ;
import java.nio.file.Files ;
import java.nio.file.Path ;
import java.nio.file.StandardOpenOption ;
import java.nio.file.attribute.FileAttribute ;
import java.nio.file.attribute.PosixFilePermission ;
import java.nio.file.attribute.PosixFilePermissions ;
import java.sql.* ;
import java.util.ArrayList ;
import java.util.List ;
import java.util.Map ;
import java.util.Set ;
/ * *
@ -173,19 +159,19 @@ public class DataxTask extends AbstractTask {
* /
@Override
public void cancelApplication ( boolean cancelApplication )
throws Exception {
throws Exception {
// cancel process
shellCommandExecutor . cancelApplication ( ) ;
}
/ * *
* build datax configuration file
*
*
* @return datax json file name
* @throws Exception if error throws Exception
* /
private String buildDataxJsonFile ( )
throws Exception {
throws Exception {
// generate json
String fileName = String . format ( "%s/%s_job.json" ,
taskExecutionContext . getExecutePath ( ) ,
@ -216,14 +202,14 @@ public class DataxTask extends AbstractTask {
}
} else {
ObjectNode job = JSONUtils . createObjectNode ( ) ;
job . putArray ( "content" ) . addAll ( buildDataxJobContentJson ( ) ) ;
job . set ( "setting" , buildDataxJobSettingJson ( ) ) ;
JSONObject job = new JSONObject ( ) ;
job . put ( "content" , buildDataxJobContentJson ( ) ) ;
job . put ( "setting" , buildDataxJobSettingJson ( ) ) ;
ObjectNode root = JSONUtils . createObjectNode ( ) ;
JSONObject root = new JSONObject ( ) ;
root . put ( "job" , job ) ;
root . put ( "core" , buildDataxCoreJson ( ) ) ;
root . set ( "job" , job ) ;
root . set ( "core" , buildDataxCoreJson ( ) ) ;
json = root . toString ( ) ;
}
@ -236,13 +222,13 @@ public class DataxTask extends AbstractTask {
/ * *
* build datax job config
*
*
* @return collection of datax job config JSONObject
* @throws SQLException if error throws SQLException
* /
private List < JSONObject > buildDataxJobContentJson ( ) throws SQLException {
DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext . getDataxTaskExecutionContext ( ) ;
private List < ObjectNode > buildDataxJobContentJson ( ) throws SQLException {
DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext . getDataxTaskExecutionContext ( ) ;
BaseDataSource dataSourceCfg = DataSourceFactory . getDatasource ( DbType . of ( dataxTaskExecutionContext . getSourcetype ( ) ) ,
dataxTaskExecutionContext . getSourceConnectionParams ( ) ) ;
@ -250,50 +236,76 @@ public class DataxTask extends AbstractTask {
BaseDataSource dataTargetCfg = DataSourceFactory . getDatasource ( DbType . of ( dataxTaskExecutionContext . getTargetType ( ) ) ,
dataxTaskExecutionContext . getTargetConnectionParams ( ) ) ;
List < JSONObject > readerConnArr = new ArrayList < > ( ) ;
JSONObject readerConn = new JSONObject ( ) ;
readerConn . put ( "querySql" , new String [ ] { dataXParameters . getSql ( ) } ) ;
readerConn . put ( "jdbcUrl" , new String [ ] { dataSourceCfg . getJdbcUrl ( ) } ) ;
List < ObjectNode > readerConnArr = new ArrayList < > ( ) ;
ObjectNode readerConn = JSONUtils . createObjectNode ( ) ;
ArrayNode sqlArr = readerConn . putArray ( "querySql" ) ;
for ( String sql : new String [ ] { dataXParameters . getSql ( ) } ) {
sqlArr . add ( sql ) ;
}
ArrayNode urlArr = readerConn . putArray ( "jdbcUrl" ) ;
for ( String url : new String [ ] { dataSourceCfg . getJdbcUrl ( ) } ) {
urlArr . add ( url ) ;
}
readerConnArr . add ( readerConn ) ;
JSONObject readerParam = new JSONObject ( ) ;
ObjectNode readerParam = JSONUtils . createObjectNode ( ) ;
readerParam . put ( "username" , dataSourceCfg . getUser ( ) ) ;
readerParam . put ( "password" , dataSourceCfg . getPassword ( ) ) ;
readerParam . put ( "connection" , readerConnArr ) ;
readerParam . putArray ( "connection" ) . addAll ( readerConnArr ) ;
JSONObject reader = new JSONObject ( ) ;
ObjectNode reader = JSONUtils . createObjectNode ( ) ;
reader . put ( "name" , DataxUtils . getReaderPluginName ( DbType . of ( dataxTaskExecutionContext . getSourcetype ( ) ) ) ) ;
reader . put ( "parameter" , readerParam ) ;
reader . set ( "parameter" , readerParam ) ;
List < ObjectNode > writerConnArr = new ArrayList < > ( ) ;
ObjectNode writerConn = JSONUtils . createObjectNode ( ) ;
ArrayNode tableArr = writerConn . putArray ( "table" ) ;
for ( String table : new String [ ] { dataXParameters . getTargetTable ( ) } ) {
tableArr . add ( table ) ;
}
List < JSONObject > writerConnArr = new ArrayList < > ( ) ;
JSONObject writerConn = new JSONObject ( ) ;
writerConn . put ( "table" , new String [ ] { dataXParameters . getTargetTable ( ) } ) ;
writerConn . put ( "jdbcUrl" , dataTargetCfg . getJdbcUrl ( ) ) ;
writerConnArr . add ( writerConn ) ;
JSON Object writerParam = new JSONObject ( ) ;
ObjectNode writerParam = JSONUtils . createObjectNode ( ) ;
writerParam . put ( "username" , dataTargetCfg . getUser ( ) ) ;
writerParam . put ( "password" , dataTargetCfg . getPassword ( ) ) ;
writerParam . put ( "column" ,
parsingSqlColumnNames ( DbType . of ( dataxTaskExecutionContext . getSourcetype ( ) ) ,
DbType . of ( dataxTaskExecutionContext . getTargetType ( ) ) ,
dataSourceCfg , dataXParameters . getSql ( ) ) ) ;
writerParam . put ( "connection" , writerConnArr ) ;
String [ ] columns = parsingSqlColumnNames ( DbType . of ( dataxTaskExecutionContext . getSourcetype ( ) ) ,
DbType . of ( dataxTaskExecutionContext . getTargetType ( ) ) ,
dataSourceCfg , dataXParameters . getSql ( ) ) ;
ArrayNode columnArr = writerParam . putArray ( "column" ) ;
for ( String column : columns ) {
columnArr . add ( column ) ;
}
writerParam . putArray ( "connection" ) . addAll ( writerConnArr ) ;
if ( CollectionUtils . isNotEmpty ( dataXParameters . getPreStatements ( ) ) ) {
writerParam . put ( "preSql" , dataXParameters . getPreStatements ( ) ) ;
ArrayNode preSqlArr = writerParam . putArray ( "preSql" ) ;
for ( String preSql : dataXParameters . getPreStatements ( ) ) {
preSqlArr . add ( preSql ) ;
}
}
if ( CollectionUtils . isNotEmpty ( dataXParameters . getPostStatements ( ) ) ) {
writerParam . put ( "postSql" , dataXParameters . getPostStatements ( ) ) ;
ArrayNode postSqlArr = writerParam . putArray ( "postSql" ) ;
for ( String postSql : dataXParameters . getPostStatements ( ) ) {
postSqlArr . add ( postSql ) ;
}
}
JSONObject writer = new JSONObject ( ) ;
ObjectNode writer = JSONUtils . createObjectNode ( ) ;
writer . put ( "name" , DataxUtils . getWriterPluginName ( DbType . of ( dataxTaskExecutionContext . getTargetType ( ) ) ) ) ;
writer . pu t( "parameter" , writerParam ) ;
writer . se t( "parameter" , writerParam ) ;
List < JSON Object> contentList = new ArrayList < > ( ) ;
JSON Object content = new JSONObject ( ) ;
List < ObjectNode > contentList = new ArrayList < > ( ) ;
ObjectNode content = JSONUtils . createObjectNode ( ) ;
content . put ( "reader" , reader ) ;
content . put ( "writer" , writer ) ;
contentList . add ( content ) ;
@ -303,11 +315,13 @@ public class DataxTask extends AbstractTask {
/ * *
* build datax setting config
*
*
* @return datax setting config JSONObject
* /
private JSONObject buildDataxJobSettingJson ( ) {
JSONObject speed = new JSONObject ( ) ;
private ObjectNode buildDataxJobSettingJson ( ) {
ObjectNode speed = JSONUtils . createObjectNode ( ) ;
speed . put ( "channel" , DATAX_CHANNEL_COUNT ) ;
if ( dataXParameters . getJobSpeedByte ( ) > 0 ) {
@ -318,19 +332,20 @@ public class DataxTask extends AbstractTask {
speed . put ( "record" , dataXParameters . getJobSpeedRecord ( ) ) ;
}
JSON Object errorLimit = new JSONObject ( ) ;
ObjectNode errorLimit = JSONUtils . createObjectNode ( ) ;
errorLimit . put ( "record" , 0 ) ;
errorLimit . put ( "percentage" , 0 ) ;
JSON Object setting = new JSONObject ( ) ;
ObjectNode setting = JSONUtils . createObjectNode ( ) ;
setting . put ( "speed" , speed ) ;
setting . put ( "errorLimit" , errorLimit ) ;
return setting ;
}
private JSONObject buildDataxCoreJson ( ) {
JSONObject speed = new JSONObject ( ) ;
private ObjectNode buildDataxCoreJson ( ) {
ObjectNode speed = JSONUtils . createObjectNode ( ) ;
speed . put ( "channel" , DATAX_CHANNEL_COUNT ) ;
if ( dataXParameters . getJobSpeedByte ( ) > 0 ) {
@ -341,26 +356,26 @@ public class DataxTask extends AbstractTask {
speed . put ( "record" , dataXParameters . getJobSpeedRecord ( ) ) ;
}
JSON Object channel = new JSONObject ( ) ;
channel . pu t( "speed" , speed ) ;
ObjectNode channel = JSONUtils . createObjectNode ( ) ;
channel . se t( "speed" , speed ) ;
JSON Object transport = new JSONObject ( ) ;
transport . pu t( "channel" , channel ) ;
ObjectNode transport = JSONUtils . createObjectNode ( ) ;
transport . se t( "channel" , channel ) ;
JSON Object core = new JSONObject ( ) ;
core . pu t( "transport" , transport ) ;
ObjectNode core = JSONUtils . createObjectNode ( ) ;
core . se t( "transport" , transport ) ;
return core ;
}
/ * *
* create command
*
*
* @return shell command file name
* @throws Exception if error throws Exception
* /
private String buildShellCommandFile ( String jobConfigFilePath )
throws Exception {
throws Exception {
// generate scripts
String fileName = String . format ( "%s/%s_node.sh" ,
taskExecutionContext . getExecutePath ( ) ,
@ -411,7 +426,7 @@ public class DataxTask extends AbstractTask {
/ * *
* parsing synchronized column names in SQL statements
*
*
* @param dsType
* the database type of the data source
* @param dtType
@ -437,7 +452,7 @@ public class DataxTask extends AbstractTask {
/ * *
* try grammatical parsing column
*
*
* @param dbType
* database type
* @param sql
@ -467,7 +482,7 @@ public class DataxTask extends AbstractTask {
}
notNull ( selectItemList ,
String . format ( "select query type [%s] is not support" , sqlSelect . getQuery ( ) . toString ( ) ) ) ;
String . format ( "select query type [%s] is not support" , sqlSelect . getQuery ( ) . toString ( ) ) ) ;
columnNames = new String [ selectItemList . size ( ) ] ;
for ( int i = 0 ; i < selectItemList . size ( ) ; i + + ) {
@ -487,12 +502,12 @@ public class DataxTask extends AbstractTask {
}
} else {
throw new RuntimeException (
String . format ( "grammatical analysis sql column [ %s ] failed" , item . toString ( ) ) ) ;
String . format ( "grammatical analysis sql column [ %s ] failed" , item . toString ( ) ) ) ;
}
if ( columnName = = null ) {
throw new RuntimeException (
String . format ( "grammatical analysis sql column [ %s ] failed" , item . toString ( ) ) ) ;
String . format ( "grammatical analysis sql column [ %s ] failed" , item . toString ( ) ) ) ;
}
columnNames [ i ] = columnName ;
@ -508,7 +523,7 @@ public class DataxTask extends AbstractTask {
/ * *
* try to execute sql to resolve column names
*
*
* @param baseDataSource
* the database connection parameters
* @param sql
@ -521,10 +536,10 @@ public class DataxTask extends AbstractTask {
sql = sql . replace ( ";" , "" ) ;
try (
Connection connection = DriverManager . getConnection ( baseDataSource . getJdbcUrl ( ) , baseDataSource . getUser ( ) ,
baseDataSource . getPassword ( ) ) ;
PreparedStatement stmt = connection . prepareStatement ( sql ) ;
ResultSet resultSet = stmt . executeQuery ( ) ) {
Connection connection = DriverManager . getConnection ( baseDataSource . getJdbcUrl ( ) , baseDataSource . getUser ( ) ,
baseDataSource . getPassword ( ) ) ;
PreparedStatement stmt = connection . prepareStatement ( sql ) ;
ResultSet resultSet = stmt . executeQuery ( ) ) {
ResultSetMetaData md = resultSet . getMetaData ( ) ;
int num = md . getColumnCount ( ) ;
@ -552,4 +567,4 @@ public class DataxTask extends AbstractTask {
}
}
}
}