Browse Source

[Fix-5808][Server] When we try to transfer data using datax between different types of data sources, the worker will exit with ClassCastException (#5809)

* bug fix

* fix bug

* simplify the code format

* add a new parameter to make it easier to understand.
2.0.7-release
kyoty 3 years ago committed by GitHub
parent
commit
bca92157a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java

24
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java

@ -252,7 +252,7 @@ public class DataxTask extends AbstractTask {
}
ArrayNode urlArr = readerConn.putArray("jdbcUrl");
urlArr.add(DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataSourceCfg));
urlArr.add(DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDsType()), dataSourceCfg));
readerConnArr.add(readerConn);
@ -270,7 +270,7 @@ public class DataxTask extends AbstractTask {
ArrayNode tableArr = writerConn.putArray("table");
tableArr.add(dataXParameters.getTargetTable());
writerConn.put("jdbcUrl", DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDsType()), dataTargetCfg));
writerConn.put("jdbcUrl", DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataTargetCfg));
writerConnArr.add(writerConn);
ObjectNode writerParam = JSONUtils.createObjectNode();
@ -437,31 +437,31 @@ public class DataxTask extends AbstractTask {
}
public String loadJvmEnv(DataxParameters dataXParameters) {
int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms();
int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx();
int xms = Math.max(dataXParameters.getXms(), 1);
int xmx = Math.max(dataXParameters.getXmx(), 1);
return String.format(JVM_PARAM, xms, xmx);
}
/**
* parsing synchronized column names in SQL statements
*
* @param dsType the database type of the data source
* @param dtType the database type of the data target
* @param sourceType the database type of the data source
* @param targetType the database type of the data target
* @param dataSourceCfg the database connection parameters of the data source
* @param sql sql for data synchronization
* @return Keyword converted column names
*/
private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseConnectionParam dataSourceCfg, String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(dsType, sql);
private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql);
if (columnNames == null || columnNames.length == 0) {
logger.info("try to execute sql analysis query column name");
columnNames = tryExecuteSqlResolveColumnNames(dataSourceCfg, sql);
columnNames = tryExecuteSqlResolveColumnNames(sourceType, dataSourceCfg, sql);
}
notNull(columnNames, String.format("parsing sql columns failed : %s", sql));
return DataxUtils.convertKeywordsColumns(dtType, columnNames);
return DataxUtils.convertKeywordsColumns(targetType, columnNames);
}
/**
@ -542,13 +542,13 @@ public class DataxTask extends AbstractTask {
* @param sql sql for data synchronization
* @return column name array
*/
public String[] tryExecuteSqlResolveColumnNames(BaseConnectionParam baseDataSource, String sql) {
public String[] tryExecuteSqlResolveColumnNames(DbType sourceType, BaseConnectionParam baseDataSource, String sql) {
String[] columnNames;
sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql);
sql = sql.replace(";", "");
try (
Connection connection = DatasourceUtil.getConnection(DbType.valueOf(dataXParameters.getDtType()), baseDataSource);
Connection connection = DatasourceUtil.getConnection(sourceType, baseDataSource);
PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet resultSet = stmt.executeQuery()) {

Loading…
Cancel
Save