diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index 84b2c27b5d..c30326d03e 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -252,7 +252,7 @@ public class DataxTask extends AbstractTask { } ArrayNode urlArr = readerConn.putArray("jdbcUrl"); - urlArr.add(DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataSourceCfg)); + urlArr.add(DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDsType()), dataSourceCfg)); readerConnArr.add(readerConn); @@ -270,7 +270,7 @@ public class DataxTask extends AbstractTask { ArrayNode tableArr = writerConn.putArray("table"); tableArr.add(dataXParameters.getTargetTable()); - writerConn.put("jdbcUrl", DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDsType()), dataTargetCfg)); + writerConn.put("jdbcUrl", DatasourceUtil.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataTargetCfg)); writerConnArr.add(writerConn); ObjectNode writerParam = JSONUtils.createObjectNode(); @@ -437,31 +437,31 @@ public class DataxTask extends AbstractTask { } public String loadJvmEnv(DataxParameters dataXParameters) { - int xms = dataXParameters.getXms() < 1 ? 1 : dataXParameters.getXms(); - int xmx = dataXParameters.getXmx() < 1 ? 1 : dataXParameters.getXmx(); + int xms = Math.max(dataXParameters.getXms(), 1); + int xmx = Math.max(dataXParameters.getXmx(), 1); return String.format(JVM_PARAM, xms, xmx); } /** * parsing synchronized column names in SQL statements * - * @param dsType the database type of the data source - * @param dtType the database type of the data target + * @param sourceType the database type of the data source + * @param targetType the database type of the data target * @param dataSourceCfg the database connection parameters of the data source * @param sql sql for data synchronization * @return Keyword converted column names */ - private String[] parsingSqlColumnNames(DbType dsType, DbType dtType, BaseConnectionParam dataSourceCfg, String sql) { - String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(dsType, sql); + private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, String sql) { + String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql); if (columnNames == null || columnNames.length == 0) { logger.info("try to execute sql analysis query column name"); - columnNames = tryExecuteSqlResolveColumnNames(dataSourceCfg, sql); + columnNames = tryExecuteSqlResolveColumnNames(sourceType, dataSourceCfg, sql); } notNull(columnNames, String.format("parsing sql columns failed : %s", sql)); - return DataxUtils.convertKeywordsColumns(dtType, columnNames); + return DataxUtils.convertKeywordsColumns(targetType, columnNames); } /** @@ -542,13 +542,13 @@ public class DataxTask extends AbstractTask { * @param sql sql for data synchronization * @return column name array */ - public String[] tryExecuteSqlResolveColumnNames(BaseConnectionParam baseDataSource, String sql) { + public String[] tryExecuteSqlResolveColumnNames(DbType sourceType, BaseConnectionParam baseDataSource, String sql) { String[] columnNames; sql = String.format("SELECT t.* FROM ( %s ) t WHERE 0 = 1", sql); sql = sql.replace(";", ""); try ( - Connection connection = DatasourceUtil.getConnection(DbType.valueOf(dataXParameters.getDtType()), baseDataSource); + Connection connection = DatasourceUtil.getConnection(sourceType, baseDataSource); PreparedStatement stmt = connection.prepareStatement(sql); ResultSet resultSet = stmt.executeQuery()) {