From 592e433b1e2eb1e524b13a4907c568d0f4ed1733 Mon Sep 17 00:00:00 2001 From: Tq Date: Mon, 30 May 2022 14:12:19 +0800 Subject: [PATCH] [Feature][Datasource]replace DataSourceController API with string JSON and cast to DTO (#10276) * replace DataSourceController API with string JSON and cast to DTO * add DataSourceProcessorManager and DataSourceProcessorProvider to datasource processor functions --- .../api/ApiApplicationServer.java | 1 - .../api/controller/DataSourceController.java | 68 +++++++++--------- .../datasource/BaseDataSourceParamDTO.java | 16 ----- .../api/datasource/DataSourceProcessor.java | 15 ++++ .../ClickHouseDataSourceProcessor.java | 17 ++++- .../db2/Db2DataSourceProcessor.java | 16 ++++- .../hive/HiveDataSourceProcessor.java | 16 ++++- .../mysql/MySQLDataSourceProcessor.java | 21 ++++-- .../oracle/OracleDataSourceProcessor.java | 18 ++++- .../PostgreSQLDataSourceProcessor.java | 18 ++++- .../presto/PrestoDataSourceProcessor.java | 18 ++++- .../redshift/RedshiftDataSourceProcessor.java | 26 +++++-- .../spark/SparkDataSourceProcessor.java | 16 ++++- .../SQLServerDataSourceProcessor.java | 18 ++++- .../plugin/DataSourceProcessorManager.java | 63 +++++++++++++++++ .../plugin/DataSourceProcessorProvider.java | 52 ++++++++++++++ .../datasource/api/utils/DataSourceUtils.java | 69 ++++++------------- .../dolphinscheduler/spi/enums/DbType.java | 11 ++- 18 files changed, 347 insertions(+), 132 deletions(-) create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java create mode 100644 dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java index 9e6aa94530..7c3532f7f3 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.api; import org.apache.dolphinscheduler.service.task.TaskPluginManager; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java index ab9d1c29c0..fdaebaa73e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java @@ -17,19 +17,11 @@ package org.apache.dolphinscheduler.api.controller; -import static org.apache.dolphinscheduler.api.enums.Status.AUTHORIZED_DATA_SOURCE; -import static org.apache.dolphinscheduler.api.enums.Status.CONNECTION_TEST_FAILURE; -import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE; -import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE; -import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLES_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLE_COLUMNS_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE; -import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATASOURCE_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.UNAUTHORIZED_DATASOURCE; -import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_DATASOURCE_ERROR; -import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_DATASOURCE_NAME_FAILURE; - +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ApiException; @@ -43,9 +35,6 @@ import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSour import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; - -import java.util.Map; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.DeleteMapping; @@ -59,14 +48,23 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiParam; import springfox.documentation.annotations.ApiIgnore; +import java.util.Map; + +import static org.apache.dolphinscheduler.api.enums.Status.AUTHORIZED_DATA_SOURCE; +import static org.apache.dolphinscheduler.api.enums.Status.CONNECTION_TEST_FAILURE; +import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE; +import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE; +import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLES_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLE_COLUMNS_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATASOURCE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.UNAUTHORIZED_DATASOURCE; +import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_DATASOURCE_ERROR; +import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_DATASOURCE_NAME_FAILURE; + /** * data source controller */ @@ -82,7 +80,8 @@ public class DataSourceController extends BaseController { * create data source * * @param loginUser login user - * @param dataSourceParam datasource param + * @param jsonStr datasource param + * example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2} * @return create result code */ @ApiOperation(value = "createDataSource", notes = "CREATE_DATA_SOURCE_NOTES") @@ -92,7 +91,8 @@ public class DataSourceController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result createDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true) - @RequestBody BaseDataSourceParamDTO dataSourceParam) { + @RequestBody String jsonStr) { + BaseDataSourceParamDTO dataSourceParam = DataSourceUtils.buildDatasourceParam(jsonStr); return dataSourceService.createDataSource(loginUser, dataSourceParam); } @@ -100,14 +100,15 @@ public class DataSourceController extends BaseController { * updateProcessInstance data source * * @param loginUser login user - * @param id datasource id - * @param dataSourceParam datasource param + * @param id datasource id + * @param jsonStr datasource param + * example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2} * @return update result code */ @ApiOperation(value = "updateDataSource", notes = "UPDATE_DATA_SOURCE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "id", value = "DATA_SOURCE_ID", required = true, dataType = "Integer"), - @ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO") + @ApiImplicitParam(name = "id", value = "DATA_SOURCE_ID", required = true, dataType = "Integer"), + @ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO") }) @PutMapping(value = "/{id}") @ResponseStatus(HttpStatus.OK) @@ -115,7 +116,8 @@ public class DataSourceController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result updateDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @PathVariable(value = "id") Integer id, - @RequestBody BaseDataSourceParamDTO dataSourceParam) { + @RequestBody String jsonStr) { + BaseDataSourceParamDTO dataSourceParam = DataSourceUtils.buildDatasourceParam(jsonStr); dataSourceParam.setId(id); return dataSourceService.updateDataSource(dataSourceParam.getId(), loginUser, dataSourceParam); } @@ -199,19 +201,21 @@ public class DataSourceController extends BaseController { * connect datasource * * @param loginUser login user - * @param dataSourceParam datasource param + * @param jsonStr datasource param + * example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2} * @return connect result code */ @ApiOperation(value = "connectDataSource", notes = "CONNECT_DATA_SOURCE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO") + @ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO") }) @PostMapping(value = "/connect") @ResponseStatus(HttpStatus.OK) @ApiException(CONNECT_DATASOURCE_FAILURE) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result connectDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestBody BaseDataSourceParamDTO dataSourceParam) { + @RequestBody String jsonStr) { + BaseDataSourceParamDTO dataSourceParam = DataSourceUtils.buildDatasourceParam(jsonStr); DataSourceUtils.checkDatasourceParam(dataSourceParam); ConnectionParam connectionParams = DataSourceUtils.buildConnectionParams(dataSourceParam); return dataSourceService.checkConnection(dataSourceParam.getType(), connectionParams); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java index 7d9cbb2cf6..28fe5c3c0b 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java @@ -32,9 +32,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import java.io.Serializable; import java.util.Map; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; - /** * Basic datasource params submitted to api. *

@@ -49,19 +46,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; * see {@link PrestoDataSourceParamDTO} * see {@link RedshiftDataSourceParamDTO} */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") -@JsonSubTypes(value = { - @JsonSubTypes.Type(value = MySQLDataSourceParamDTO.class, name = "MYSQL"), - @JsonSubTypes.Type(value = PostgreSQLDataSourceParamDTO.class, name = "POSTGRESQL"), - @JsonSubTypes.Type(value = HiveDataSourceParamDTO.class, name = "HIVE"), - @JsonSubTypes.Type(value = SparkDataSourceParamDTO.class, name = "SPARK"), - @JsonSubTypes.Type(value = ClickHouseDataSourceParamDTO.class, name = "CLICKHOUSE"), - @JsonSubTypes.Type(value = OracleDataSourceParamDTO.class, name = "ORACLE"), - @JsonSubTypes.Type(value = SQLServerDataSourceParamDTO.class, name = "SQLSERVER"), - @JsonSubTypes.Type(value = Db2DataSourceParamDTO.class, name = "DB2"), - @JsonSubTypes.Type(value = PrestoDataSourceParamDTO.class, name = "PRESTO"), - @JsonSubTypes.Type(value = RedshiftDataSourceParamDTO.class, name = "REDSHIFT"), -}) public abstract class BaseDataSourceParamDTO implements Serializable { protected Integer id; diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java index e4603165b7..3cfac25663 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java @@ -26,6 +26,15 @@ import java.sql.SQLException; public interface DataSourceProcessor { + /** + * cast JSON to relate DTO + * + * @param paramJson + * @return {@link BaseDataSourceParamDTO} + */ + BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson); + + /** * check datasource param is valid */ @@ -33,6 +42,7 @@ public interface DataSourceProcessor { /** * get Datasource Client UniqueId + * * @return UniqueId */ String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType); @@ -90,4 +100,9 @@ public interface DataSourceProcessor { * @return {@link DbType} */ DbType getDbType(); + + /** + * get datasource processor + */ + DataSourceProcessor create(); } diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java index ea71d25a8d..509b9221d5 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java @@ -17,8 +17,11 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.clickhouse; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -26,16 +29,21 @@ import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.collections4.MapUtils; - import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.LinkedHashMap; import java.util.Map; +@AutoService(DataSourceProcessor.class) public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor { + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, ClickHouseDataSourceParamDTO.class); + } + + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { ClickHouseConnectionParam connectionParams = (ClickHouseConnectionParam) createConnectionParams(connectionJson); @@ -110,6 +118,11 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor { return DbType.CLICKHOUSE; } + @Override + public DataSourceProcessor create() { + return new ClickHouseDataSourceProcessor(); + } + private String transformOther(Map otherMap) { if (MapUtils.isEmpty(otherMap)) { return null; diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java index 7e0548c71f..16d326fecb 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java @@ -17,8 +17,11 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.db2; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; @@ -27,16 +30,20 @@ import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.collections4.MapUtils; - import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.LinkedHashMap; import java.util.Map; +@AutoService(DataSourceProcessor.class) public class Db2DataSourceProcessor extends AbstractDataSourceProcessor { + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, Db2DataSourceParamDTO.class); + } + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { Db2ConnectionParam connectionParams = (Db2ConnectionParam) createConnectionParams(connectionJson); @@ -106,6 +113,11 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor { return DbType.DB2; } + @Override + public DataSourceProcessor create() { + return new Db2DataSourceProcessor(); + } + @Override public String getValidationQuery() { return Constants.DB2_VALIDATION_QUERY; diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java index 898ceb39aa..8c3d94eac6 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java @@ -17,8 +17,11 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.hive; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; @@ -28,8 +31,6 @@ import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.collections4.MapUtils; - import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; @@ -37,8 +38,14 @@ import java.sql.SQLException; import java.util.LinkedHashMap; import java.util.Map; +@AutoService(DataSourceProcessor.class) public class HiveDataSourceProcessor extends AbstractDataSourceProcessor { + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, HiveDataSourceParamDTO.class); + } + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { HiveDataSourceParamDTO hiveDataSourceParamDTO = new HiveDataSourceParamDTO(); @@ -136,6 +143,11 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor { return DbType.HIVE; } + @Override + public DataSourceProcessor create() { + return new HiveDataSourceProcessor(); + } + private String transformOther(Map otherMap) { if (MapUtils.isEmpty(otherMap)) { return null; diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java index a0c5347a7c..a4d4ec08eb 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java @@ -17,8 +17,11 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; @@ -26,8 +29,8 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; - -import org.apache.commons.collections4.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; @@ -36,9 +39,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +@AutoService(DataSourceProcessor.class) public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor { private final Logger logger = LoggerFactory.getLogger(MySQLDataSourceProcessor.class); @@ -53,6 +54,11 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor { private static final String APPEND_PARAMS = "allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"; + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, MySQLDataSourceParamDTO.class); + } + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { MySQLConnectionParam connectionParams = (MySQLConnectionParam) createConnectionParams(connectionJson); @@ -138,6 +144,11 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor { return DbType.MYSQL; } + @Override + public DataSourceProcessor create() { + return new MySQLDataSourceProcessor(); + } + private String transformOther(Map paramMap) { if (MapUtils.isEmpty(paramMap)) { return null; diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java index 44c4ab8a46..b57f6a97f8 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java @@ -17,8 +17,12 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.oracle; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; @@ -27,9 +31,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang.StringUtils; - import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -38,8 +39,14 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +@AutoService(DataSourceProcessor.class) public class OracleDataSourceProcessor extends AbstractDataSourceProcessor { + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, OracleDataSourceParamDTO.class); + } + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { OracleConnectionParam connectionParams = (OracleConnectionParam) createConnectionParams(connectionJson); @@ -128,6 +135,11 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor { return DbType.ORACLE; } + @Override + public DataSourceProcessor create() { + return new OracleDataSourceProcessor(); + } + private String transformOther(Map otherMap) { if (MapUtils.isEmpty(otherMap)) { return null; diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java index f14302f1e7..55ebcc4bb5 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java @@ -17,8 +17,12 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.postgresql; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; @@ -26,17 +30,20 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang.StringUtils; - import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.LinkedHashMap; import java.util.Map; +@AutoService(DataSourceProcessor.class) public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor { + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, PostgreSQLDataSourceParamDTO.class); + } + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { PostgreSQLConnectionParam connectionParams = (PostgreSQLConnectionParam) createConnectionParams(connectionJson); @@ -111,6 +118,11 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor { return DbType.POSTGRESQL; } + @Override + public DataSourceProcessor create() { + return new PostgreSQLDataSourceProcessor(); + } + private String transformOther(Map otherMap) { if (MapUtils.isEmpty(otherMap)) { return null; diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java index 8fe1a7231c..bf324eaffc 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java @@ -17,8 +17,12 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.presto; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; @@ -26,9 +30,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang.StringUtils; - import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -37,8 +38,14 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +@AutoService(DataSourceProcessor.class) public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor { + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, PrestoDataSourceParamDTO.class); + } + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { PrestoConnectionParam connectionParams = (PrestoConnectionParam) createConnectionParams(connectionJson); @@ -113,6 +120,11 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor { return DbType.PRESTO; } + @Override + public DataSourceProcessor create() { + return new PrestoDataSourceProcessor(); + } + private String transformOther(Map otherMap) { if (MapUtils.isNotEmpty(otherMap)) { List list = new ArrayList<>(); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java index 5a9c521fce..e002791b76 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java @@ -17,8 +17,12 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; @@ -26,9 +30,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang.StringUtils; - import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; @@ -37,18 +38,24 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +@AutoService(DataSourceProcessor.class) public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor { + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, RedshiftDataSourceParamDTO.class); + } + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { RedshiftConnectionParam - connectionParams = (RedshiftConnectionParam) createConnectionParams(connectionJson); + connectionParams = (RedshiftConnectionParam) createConnectionParams(connectionJson); String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH); String[] hostPortArray = hostSeperator[hostSeperator.length - 1].split(Constants.COMMA); RedshiftDataSourceParamDTO - redshiftDatasourceParamDTO = new RedshiftDataSourceParamDTO(); + redshiftDatasourceParamDTO = new RedshiftDataSourceParamDTO(); redshiftDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1])); redshiftDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]); redshiftDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); @@ -65,7 +72,7 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor { String jdbcUrl = address + Constants.SLASH + redshiftParam.getDatabase(); RedshiftConnectionParam - redshiftConnectionParam = new RedshiftConnectionParam(); + redshiftConnectionParam = new RedshiftConnectionParam(); redshiftConnectionParam.setUser(redshiftParam.getUserName()); redshiftConnectionParam.setPassword(PasswordUtils.encodePassword(redshiftParam.getPassword())); redshiftConnectionParam.setOther(transformOther(redshiftParam.getOther())); @@ -97,7 +104,7 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor { @Override public String getJdbcUrl(ConnectionParam connectionParam) { RedshiftConnectionParam - redshiftConnectionParam = (RedshiftConnectionParam) connectionParam; + redshiftConnectionParam = (RedshiftConnectionParam) connectionParam; if (!StringUtils.isEmpty(redshiftConnectionParam.getOther())) { return String.format("%s?%s", redshiftConnectionParam.getJdbcUrl(), redshiftConnectionParam.getOther()); } @@ -117,6 +124,11 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor { return DbType.REDSHIFT; } + @Override + public DataSourceProcessor create() { + return new RedshiftDataSourceProcessor(); + } + private String transformOther(Map otherMap) { if (MapUtils.isNotEmpty(otherMap)) { List list = new ArrayList<>(otherMap.size()); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java index 69fcd4ff24..2d25f427b0 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java @@ -17,8 +17,11 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.spark; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; @@ -28,8 +31,6 @@ import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.apache.dolphinscheduler.spi.utils.StringUtils; -import org.apache.commons.collections4.MapUtils; - import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; @@ -40,8 +41,14 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +@AutoService(DataSourceProcessor.class) public class SparkDataSourceProcessor extends AbstractDataSourceProcessor { + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, SparkDataSourceParamDTO.class); + } + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { SparkConnectionParam connectionParams = (SparkConnectionParam) createConnectionParams(connectionJson); @@ -138,6 +145,11 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor { return DbType.SPARK; } + @Override + public DataSourceProcessor create() { + return new SparkDataSourceProcessor(); + } + private String transformOther(Map otherMap) { if (MapUtils.isEmpty(otherMap)) { return null; diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java index 44855d1fb6..e33c3c56fd 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java @@ -17,8 +17,12 @@ package org.apache.dolphinscheduler.plugin.datasource.api.datasource.sqlserver; +import com.google.auto.service.AutoService; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; @@ -26,17 +30,20 @@ import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.utils.Constants; import org.apache.dolphinscheduler.spi.utils.JSONUtils; -import org.apache.commons.collections4.MapUtils; -import org.apache.commons.lang.StringUtils; - import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.LinkedHashMap; import java.util.Map; +@AutoService(DataSourceProcessor.class) public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor { + @Override + public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) { + return JSONUtils.parseObject(paramJson, SQLServerDataSourceParamDTO.class); + } + @Override public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { SQLServerConnectionParam connectionParams = (SQLServerConnectionParam) createConnectionParams(connectionJson); @@ -109,6 +116,11 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor { return DbType.SQLSERVER; } + @Override + public DataSourceProcessor create() { + return new SQLServerDataSourceProcessor(); + } + private String transformOther(Map otherMap) { if (MapUtils.isEmpty(otherMap)) { return null; diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java new file mode 100644 index 0000000000..de55f383a3 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.api.plugin; + +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static java.lang.String.format; + +public class DataSourceProcessorManager { + private static final Logger logger = LoggerFactory.getLogger(DataSourceProcessorManager.class); + + private static final Map dataSourceProcessorMap = new ConcurrentHashMap<>(); + + public Map getDataSourceProcessorMap() { + return Collections.unmodifiableMap(dataSourceProcessorMap); + } + + public void installProcessor() { + final Set names = new HashSet<>(); + + ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> { + final String name = factory.getDbType().name(); + + logger.info("start register processor: {}", name); + if (!names.add(name)) { + throw new IllegalStateException(format("Duplicate datasource plugins named '%s'", name)); + } + loadDatasourceClient(factory); + + logger.info("done register processor: {}", name); + + }); + } + + private void loadDatasourceClient(DataSourceProcessor processor) { + DataSourceProcessor instance = processor.create(); + dataSourceProcessorMap.put(processor.getDbType().name(), instance); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java new file mode 100644 index 0000000000..7cdd5c98d1 --- /dev/null +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.datasource.api.plugin; + +import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + + +public class DataSourceProcessorProvider { + private static final Logger logger = LoggerFactory.getLogger(DataSourceProcessorProvider.class); + + private DataSourceProcessorManager dataSourcePluginManager; + + private DataSourceProcessorProvider() { + initDataSourceProcessorPlugin(); + } + + private static class DataSourceClientProviderHolder { + private static final DataSourceProcessorProvider INSTANCE = new DataSourceProcessorProvider(); + } + + public static DataSourceProcessorProvider getInstance() { + return DataSourceClientProviderHolder.INSTANCE; + } + + public Map getDataSourceProcessorMap() { + return dataSourcePluginManager.getDataSourceProcessorMap(); + } + + private void initDataSourceProcessorPlugin() { + dataSourcePluginManager = new DataSourceProcessorManager(); + dataSourcePluginManager.installProcessor(); + } +} diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java index edcd1a332d..b4a85e4674 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java @@ -17,44 +17,26 @@ package org.apache.dolphinscheduler.plugin.datasource.api.utils; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.clickhouse.ClickHouseDataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.db2.Db2DataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.hive.HiveDataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLDataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.oracle.OracleDataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.postgresql.PostgreSQLDataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.presto.PrestoDataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift.RedshiftDataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.spark.SparkDataSourceProcessor; -import org.apache.dolphinscheduler.plugin.datasource.api.datasource.sqlserver.SQLServerDataSourceProcessor; +import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; - -import java.sql.Connection; - +import org.apache.dolphinscheduler.spi.utils.JSONUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.util.Map; + public class DataSourceUtils { - private DataSourceUtils() { + public DataSourceUtils() { } private static final Logger logger = LoggerFactory.getLogger(DataSourceUtils.class); - private static final DataSourceProcessor mysqlProcessor = new MySQLDataSourceProcessor(); - private static final DataSourceProcessor postgreSqlProcessor = new PostgreSQLDataSourceProcessor(); - private static final DataSourceProcessor hiveProcessor = new HiveDataSourceProcessor(); - private static final DataSourceProcessor sparkProcessor = new SparkDataSourceProcessor(); - private static final DataSourceProcessor clickhouseProcessor = new ClickHouseDataSourceProcessor(); - private static final DataSourceProcessor oracleProcessor = new OracleDataSourceProcessor(); - private static final DataSourceProcessor sqlServerProcessor = new SQLServerDataSourceProcessor(); - private static final DataSourceProcessor db2PROCESSOR = new Db2DataSourceProcessor(); - private static final DataSourceProcessor prestoPROCESSOR = new PrestoDataSourceProcessor(); - private static final DataSourceProcessor redshiftProcessor = new RedshiftDataSourceProcessor(); - /** * check datasource param * @@ -103,30 +85,11 @@ public class DataSourceUtils { } public static DataSourceProcessor getDatasourceProcessor(DbType dbType) { - switch (dbType) { - case MYSQL: - return mysqlProcessor; - case POSTGRESQL: - return postgreSqlProcessor; - case HIVE: - return hiveProcessor; - case SPARK: - return sparkProcessor; - case CLICKHOUSE: - return clickhouseProcessor; - case ORACLE: - return oracleProcessor; - case SQLSERVER: - return sqlServerProcessor; - case DB2: - return db2PROCESSOR; - case PRESTO: - return prestoPROCESSOR; - case REDSHIFT: - return redshiftProcessor; - default: - throw new IllegalArgumentException("datasource type illegal:" + dbType); + Map dataSourceProcessorMap = DataSourceProcessorProvider.getInstance().getDataSourceProcessorMap(); + if (!dataSourceProcessorMap.containsKey(dbType.name())) { + throw new IllegalArgumentException("illegal datasource type"); } + return dataSourceProcessorMap.get(dbType.name()); } /** @@ -135,4 +98,14 @@ public class DataSourceUtils { public static String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) { return getDatasourceProcessor(dbType).getDatasourceUniqueId(connectionParam, dbType); } + + /** + * build connection url + */ + public static BaseDataSourceParamDTO buildDatasourceParam(String param) { + JsonNode jsonNodes = JSONUtils.parseObject(param); + + return getDatasourceProcessor(DbType.ofName(jsonNodes.get("type").asText().toUpperCase())) + .castDatasourceParamDTO(param); + } } diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java index 099ec7f767..869111621a 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java @@ -17,13 +17,14 @@ package org.apache.dolphinscheduler.spi.enums; -import static java.util.stream.Collectors.toMap; +import com.baomidou.mybatisplus.annotation.EnumValue; +import com.google.common.base.Functions; import java.util.Arrays; import java.util.Map; +import java.util.NoSuchElementException; -import com.baomidou.mybatisplus.annotation.EnumValue; -import com.google.common.base.Functions; +import static java.util.stream.Collectors.toMap; public enum DbType { MYSQL(0, "mysql"), @@ -66,6 +67,10 @@ public enum DbType { return null; } + public static DbType ofName(String name) { + return Arrays.stream(DbType.values()).filter(e -> e.name().equals(name)).findFirst().orElseThrow(() -> new NoSuchElementException("no such db type")); + } + public boolean isHive() { return this == DbType.HIVE; }