From 1199a916b35b5f3fef92ba9896df164625051e84 Mon Sep 17 00:00:00 2001 From: xxsc0529 <93303124+xxsc0529@users.noreply.github.com> Date: Tue, 16 Jul 2024 09:27:05 +0800 Subject: [PATCH] [Feature-16132][Task Plugin] support for oceanbase datax task (#16281) * Add support for oceanbase's datax task --feat * fix code style --------- Co-authored-by: xiangzihao <460888207@qq.com> --- .../spi/datasource/BaseConnectionParam.java | 2 ++ .../plugin/task/datax/DataxTask.java | 8 ++++++-- .../plugin/task/datax/DataxUtils.java | 17 +++++++++++++++++ .../task/components/node/fields/use-datax.ts | 3 ++- 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/BaseConnectionParam.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/BaseConnectionParam.java index 40e809c3d7..343dc26c01 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/BaseConnectionParam.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/BaseConnectionParam.java @@ -44,6 +44,8 @@ public abstract class BaseConnectionParam implements ConnectionParam { protected String validationQuery; + protected String compatibleMode; + protected Map other; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 6490b8b0b0..f4a78957b5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.spi.enums.Flag; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import java.io.File; import java.nio.charset.StandardCharsets; @@ -420,7 +421,7 @@ public class DataxTask extends AbstractTask { */ private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, String sql) { - String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql); + String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql, dataSourceCfg.getCompatibleMode()); if (columnNames == null || columnNames.length == 0) { log.info("try to execute sql analysis query column name"); @@ -440,11 +441,14 @@ public class DataxTask extends AbstractTask { * @return column name array * @throws RuntimeException if error throws RuntimeException */ - private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql) { + private String[] tryGrammaticalAnalysisSqlColumnNames(DbType dbType, String sql, String compatibleMode) { String[] columnNames; try { SQLStatementParser parser = DataxUtils.getSqlStatementParser(dbType, sql); + if (StringUtils.isNotBlank(compatibleMode)) { + parser = DataxUtils.getSqlStatementParser(compatibleMode, sql); + } if (parser == null) { log.warn("database driver [{}] is not support grammatical analysis sql", dbType); return new String[0]; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxUtils.java index 33b1f2779e..edc2e3f2a7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxUtils.java @@ -42,6 +42,8 @@ public class DataxUtils { public static final String DATAX_READER_PLUGIN_RDBMS = "rdbmsreader"; + public static final String DATAX_READER_PLUGIN_OCEANBASE = "oceanbasev10reader"; + public static final String DATAX_WRITER_PLUGIN_MYSQL = "mysqlwriter"; public static final String DATAX_WRITER_PLUGIN_POSTGRESQL = "postgresqlwriter"; @@ -55,6 +57,8 @@ public class DataxUtils { public static final String DATAX_WRITER_PLUGIN_RDBMS = "rdbmswriter"; + public static final String DATAX_WRITER_PLUGIN_OCEANBASE = "oceanbasev10writer"; + public static String getReaderPluginName(DbType dbType) { switch (dbType) { case MYSQL: @@ -67,6 +71,8 @@ public class DataxUtils { return DATAX_READER_PLUGIN_SQLSERVER; case CLICKHOUSE: return DATAX_READER_PLUGIN_CLICKHOUSE; + case OCEANBASE: + return DATAX_READER_PLUGIN_OCEANBASE; case HIVE: case PRESTO: default: @@ -88,6 +94,8 @@ public class DataxUtils { return DATAX_WRITER_PLUGIN_CLICKHOUSE; case DATABEND: return DATAX_WRITER_PLUGIN_DATABEND; + case OCEANBASE: + return DATAX_WRITER_PLUGIN_OCEANBASE; case HIVE: case PRESTO: default: @@ -116,6 +124,13 @@ public class DataxUtils { } } + public static SQLStatementParser getSqlStatementParser(String compatibleMode, String sql) { + if (compatibleMode.toLowerCase().equals(DbType.ORACLE.getName())) { + return new OracleStatementParser(sql); + } + return new MySqlStatementParser(sql); + } + public static String[] convertKeywordsColumns(DbType dbType, String[] columns) { if (columns == null) { return null; @@ -152,6 +167,8 @@ public class DataxUtils { return String.format("`%s`", column); case DATABEND: return String.format("`%s`", column); + case OCEANBASE: + return String.format("`%s`", column); default: return column; } diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts index 24552bfb0b..1c67019e18 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datax.ts @@ -131,7 +131,8 @@ export function useDataX(model: { [field: string]: any }): IJsonItem[] { 'CLICKHOUSE', 'DATABEND', 'HIVE', - 'PRESTO' + 'PRESTO', + 'OCEANBASE' ] onMounted(() => { initConstants()