Browse Source

[Feature][Datasource]replace DataSourceController API with string JSON and cast to DTO (#10276)

* replace DataSourceController API with string JSON and cast to DTO

* add DataSourceProcessorManager and DataSourceProcessorProvider to datasource processor functions
3.1.0-release
Tq 2 years ago committed by GitHub
parent
commit
592e433b1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
  2. 68
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
  3. 16
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java
  4. 15
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java
  5. 17
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java
  6. 16
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java
  7. 16
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java
  8. 21
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java
  9. 18
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java
  10. 18
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java
  11. 18
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java
  12. 26
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java
  13. 16
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java
  14. 18
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java
  15. 63
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java
  16. 52
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
  17. 69
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
  18. 11
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java

1
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.api;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

68
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java

@ -17,19 +17,11 @@
package org.apache.dolphinscheduler.api.controller;
import static org.apache.dolphinscheduler.api.enums.Status.AUTHORIZED_DATA_SOURCE;
import static org.apache.dolphinscheduler.api.enums.Status.CONNECTION_TEST_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLES_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLE_COLUMNS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.UNAUTHORIZED_DATASOURCE;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_DATASOURCE_NAME_FAILURE;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException;
@ -43,9 +35,6 @@ import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSour
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.DeleteMapping;
@ -59,14 +48,23 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
import java.util.Map;
import static org.apache.dolphinscheduler.api.enums.Status.AUTHORIZED_DATA_SOURCE;
import static org.apache.dolphinscheduler.api.enums.Status.CONNECTION_TEST_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLES_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLE_COLUMNS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.UNAUTHORIZED_DATASOURCE;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_DATASOURCE_NAME_FAILURE;
/**
* data source controller
*/
@ -82,7 +80,8 @@ public class DataSourceController extends BaseController {
* create data source
*
* @param loginUser login user
* @param dataSourceParam datasource param
* @param jsonStr datasource param
* example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2}
* @return create result code
*/
@ApiOperation(value = "createDataSource", notes = "CREATE_DATA_SOURCE_NOTES")
@ -92,7 +91,8 @@ public class DataSourceController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result createDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true)
@RequestBody BaseDataSourceParamDTO dataSourceParam) {
@RequestBody String jsonStr) {
BaseDataSourceParamDTO dataSourceParam = DataSourceUtils.buildDatasourceParam(jsonStr);
return dataSourceService.createDataSource(loginUser, dataSourceParam);
}
@ -100,14 +100,15 @@ public class DataSourceController extends BaseController {
* updateProcessInstance data source
*
* @param loginUser login user
* @param id datasource id
* @param dataSourceParam datasource param
* @param id datasource id
* @param jsonStr datasource param
* example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2}
* @return update result code
*/
@ApiOperation(value = "updateDataSource", notes = "UPDATE_DATA_SOURCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "id", value = "DATA_SOURCE_ID", required = true, dataType = "Integer"),
@ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO")
@ApiImplicitParam(name = "id", value = "DATA_SOURCE_ID", required = true, dataType = "Integer"),
@ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO")
})
@PutMapping(value = "/{id}")
@ResponseStatus(HttpStatus.OK)
@ -115,7 +116,8 @@ public class DataSourceController extends BaseController {
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result updateDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@PathVariable(value = "id") Integer id,
@RequestBody BaseDataSourceParamDTO dataSourceParam) {
@RequestBody String jsonStr) {
BaseDataSourceParamDTO dataSourceParam = DataSourceUtils.buildDatasourceParam(jsonStr);
dataSourceParam.setId(id);
return dataSourceService.updateDataSource(dataSourceParam.getId(), loginUser, dataSourceParam);
}
@ -199,19 +201,21 @@ public class DataSourceController extends BaseController {
* connect datasource
*
* @param loginUser login user
* @param dataSourceParam datasource param
* @param jsonStr datasource param
* example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2}
* @return connect result code
*/
@ApiOperation(value = "connectDataSource", notes = "CONNECT_DATA_SOURCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO")
@ApiImplicitParam(name = "dataSourceParam", value = "DATA_SOURCE_PARAM", required = true, dataType = "BaseDataSourceParamDTO")
})
@PostMapping(value = "/connect")
@ResponseStatus(HttpStatus.OK)
@ApiException(CONNECT_DATASOURCE_FAILURE)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result connectDataSource(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestBody BaseDataSourceParamDTO dataSourceParam) {
@RequestBody String jsonStr) {
BaseDataSourceParamDTO dataSourceParam = DataSourceUtils.buildDatasourceParam(jsonStr);
DataSourceUtils.checkDatasourceParam(dataSourceParam);
ConnectionParam connectionParams = DataSourceUtils.buildConnectionParams(dataSourceParam);
return dataSourceService.checkConnection(dataSourceParam.getType(), connectionParams);

16
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java

@ -32,9 +32,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import java.io.Serializable;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* Basic datasource params submitted to api.
* <p>
@ -49,19 +46,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
* see {@link PrestoDataSourceParamDTO}
* see {@link RedshiftDataSourceParamDTO}
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(value = MySQLDataSourceParamDTO.class, name = "MYSQL"),
@JsonSubTypes.Type(value = PostgreSQLDataSourceParamDTO.class, name = "POSTGRESQL"),
@JsonSubTypes.Type(value = HiveDataSourceParamDTO.class, name = "HIVE"),
@JsonSubTypes.Type(value = SparkDataSourceParamDTO.class, name = "SPARK"),
@JsonSubTypes.Type(value = ClickHouseDataSourceParamDTO.class, name = "CLICKHOUSE"),
@JsonSubTypes.Type(value = OracleDataSourceParamDTO.class, name = "ORACLE"),
@JsonSubTypes.Type(value = SQLServerDataSourceParamDTO.class, name = "SQLSERVER"),
@JsonSubTypes.Type(value = Db2DataSourceParamDTO.class, name = "DB2"),
@JsonSubTypes.Type(value = PrestoDataSourceParamDTO.class, name = "PRESTO"),
@JsonSubTypes.Type(value = RedshiftDataSourceParamDTO.class, name = "REDSHIFT"),
})
public abstract class BaseDataSourceParamDTO implements Serializable {
protected Integer id;

15
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/DataSourceProcessor.java

@ -26,6 +26,15 @@ import java.sql.SQLException;
public interface DataSourceProcessor {
/**
* cast JSON to relate DTO
*
* @param paramJson
* @return {@link BaseDataSourceParamDTO}
*/
BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson);
/**
* check datasource param is valid
*/
@ -33,6 +42,7 @@ public interface DataSourceProcessor {
/**
* get Datasource Client UniqueId
*
* @return UniqueId
*/
String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType);
@ -90,4 +100,9 @@ public interface DataSourceProcessor {
* @return {@link DbType}
*/
DbType getDbType();
/**
* get datasource processor
*/
DataSourceProcessor create();
}

17
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java

@ -17,8 +17,11 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.clickhouse;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
@ -26,16 +29,21 @@ import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.MapUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, ClickHouseDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
ClickHouseConnectionParam connectionParams = (ClickHouseConnectionParam) createConnectionParams(connectionJson);
@ -110,6 +118,11 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.CLICKHOUSE;
}
@Override
public DataSourceProcessor create() {
return new ClickHouseDataSourceProcessor();
}
private String transformOther(Map<String, String> otherMap) {
if (MapUtils.isEmpty(otherMap)) {
return null;

16
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java

@ -17,8 +17,11 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.db2;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@ -27,16 +30,20 @@ import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.MapUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, Db2DataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
Db2ConnectionParam connectionParams = (Db2ConnectionParam) createConnectionParams(connectionJson);
@ -106,6 +113,11 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.DB2;
}
@Override
public DataSourceProcessor create() {
return new Db2DataSourceProcessor();
}
@Override
public String getValidationQuery() {
return Constants.DB2_VALIDATION_QUERY;

16
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java

@ -17,8 +17,11 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.hive;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
@ -28,8 +31,6 @@ import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.MapUtils;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
@ -37,8 +38,14 @@ import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, HiveDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
HiveDataSourceParamDTO hiveDataSourceParamDTO = new HiveDataSourceParamDTO();
@ -136,6 +143,11 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.HIVE;
}
@Override
public DataSourceProcessor create() {
return new HiveDataSourceProcessor();
}
private String transformOther(Map<String, String> otherMap) {
if (MapUtils.isEmpty(otherMap)) {
return null;

21
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java

@ -17,8 +17,11 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@ -26,8 +29,8 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
@ -36,9 +39,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@AutoService(DataSourceProcessor.class)
public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
private final Logger logger = LoggerFactory.getLogger(MySQLDataSourceProcessor.class);
@ -53,6 +54,11 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
private static final String APPEND_PARAMS = "allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false";
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, MySQLDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
MySQLConnectionParam connectionParams = (MySQLConnectionParam) createConnectionParams(connectionJson);
@ -138,6 +144,11 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.MYSQL;
}
@Override
public DataSourceProcessor create() {
return new MySQLDataSourceProcessor();
}
private String transformOther(Map<String, String> paramMap) {
if (MapUtils.isEmpty(paramMap)) {
return null;

18
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java

@ -17,8 +17,12 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.oracle;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@ -27,9 +31,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
@ -38,8 +39,14 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, OracleDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
OracleConnectionParam connectionParams = (OracleConnectionParam) createConnectionParams(connectionJson);
@ -128,6 +135,11 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.ORACLE;
}
@Override
public DataSourceProcessor create() {
return new OracleDataSourceProcessor();
}
private String transformOther(Map<String, String> otherMap) {
if (MapUtils.isEmpty(otherMap)) {
return null;

18
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java

@ -17,8 +17,12 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.postgresql;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@ -26,17 +30,20 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, PostgreSQLDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
PostgreSQLConnectionParam connectionParams = (PostgreSQLConnectionParam) createConnectionParams(connectionJson);
@ -111,6 +118,11 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.POSTGRESQL;
}
@Override
public DataSourceProcessor create() {
return new PostgreSQLDataSourceProcessor();
}
private String transformOther(Map<String, String> otherMap) {
if (MapUtils.isEmpty(otherMap)) {
return null;

18
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java

@ -17,8 +17,12 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.presto;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@ -26,9 +30,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
@ -37,8 +38,14 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, PrestoDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
PrestoConnectionParam connectionParams = (PrestoConnectionParam) createConnectionParams(connectionJson);
@ -113,6 +120,11 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.PRESTO;
}
@Override
public DataSourceProcessor create() {
return new PrestoDataSourceProcessor();
}
private String transformOther(Map<String, String> otherMap) {
if (MapUtils.isNotEmpty(otherMap)) {
List<String> list = new ArrayList<>();

26
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java

@ -17,8 +17,12 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@ -26,9 +30,6 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
@ -37,18 +38,24 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, RedshiftDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
RedshiftConnectionParam
connectionParams = (RedshiftConnectionParam) createConnectionParams(connectionJson);
connectionParams = (RedshiftConnectionParam) createConnectionParams(connectionJson);
String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH);
String[] hostPortArray = hostSeperator[hostSeperator.length - 1].split(Constants.COMMA);
RedshiftDataSourceParamDTO
redshiftDatasourceParamDTO = new RedshiftDataSourceParamDTO();
redshiftDatasourceParamDTO = new RedshiftDataSourceParamDTO();
redshiftDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1]));
redshiftDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
redshiftDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
@ -65,7 +72,7 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
String jdbcUrl = address + Constants.SLASH + redshiftParam.getDatabase();
RedshiftConnectionParam
redshiftConnectionParam = new RedshiftConnectionParam();
redshiftConnectionParam = new RedshiftConnectionParam();
redshiftConnectionParam.setUser(redshiftParam.getUserName());
redshiftConnectionParam.setPassword(PasswordUtils.encodePassword(redshiftParam.getPassword()));
redshiftConnectionParam.setOther(transformOther(redshiftParam.getOther()));
@ -97,7 +104,7 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
RedshiftConnectionParam
redshiftConnectionParam = (RedshiftConnectionParam) connectionParam;
redshiftConnectionParam = (RedshiftConnectionParam) connectionParam;
if (!StringUtils.isEmpty(redshiftConnectionParam.getOther())) {
return String.format("%s?%s", redshiftConnectionParam.getJdbcUrl(), redshiftConnectionParam.getOther());
}
@ -117,6 +124,11 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.REDSHIFT;
}
@Override
public DataSourceProcessor create() {
return new RedshiftDataSourceProcessor();
}
private String transformOther(Map<String, String> otherMap) {
if (MapUtils.isNotEmpty(otherMap)) {
List<String> list = new ArrayList<>(otherMap.size());

16
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java

@ -17,8 +17,11 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.spark;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
@ -28,8 +31,6 @@ import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.MapUtils;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
@ -40,8 +41,14 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@AutoService(DataSourceProcessor.class)
public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, SparkDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
SparkConnectionParam connectionParams = (SparkConnectionParam) createConnectionParams(connectionJson);
@ -138,6 +145,11 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.SPARK;
}
@Override
public DataSourceProcessor create() {
return new SparkDataSourceProcessor();
}
private String transformOther(Map<String, String> otherMap) {
if (MapUtils.isEmpty(otherMap)) {
return null;

18
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java

@ -17,8 +17,12 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.sqlserver;
import com.google.auto.service.AutoService;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@ -26,17 +30,20 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
@AutoService(DataSourceProcessor.class)
public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return JSONUtils.parseObject(paramJson, SQLServerDataSourceParamDTO.class);
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
SQLServerConnectionParam connectionParams = (SQLServerConnectionParam) createConnectionParams(connectionJson);
@ -109,6 +116,11 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
return DbType.SQLSERVER;
}
@Override
public DataSourceProcessor create() {
return new SQLServerDataSourceProcessor();
}
private String transformOther(Map<String, String> otherMap) {
if (MapUtils.isEmpty(otherMap)) {
return null;

63
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static java.lang.String.format;
public class DataSourceProcessorManager {
private static final Logger logger = LoggerFactory.getLogger(DataSourceProcessorManager.class);
private static final Map<String, DataSourceProcessor> dataSourceProcessorMap = new ConcurrentHashMap<>();
public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
return Collections.unmodifiableMap(dataSourceProcessorMap);
}
public void installProcessor() {
final Set<String> names = new HashSet<>();
ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> {
final String name = factory.getDbType().name();
logger.info("start register processor: {}", name);
if (!names.add(name)) {
throw new IllegalStateException(format("Duplicate datasource plugins named '%s'", name));
}
loadDatasourceClient(factory);
logger.info("done register processor: {}", name);
});
}
private void loadDatasourceClient(DataSourceProcessor processor) {
DataSourceProcessor instance = processor.create();
dataSourceProcessorMap.put(processor.getDbType().name(), instance);
}
}

52
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class DataSourceProcessorProvider {
private static final Logger logger = LoggerFactory.getLogger(DataSourceProcessorProvider.class);
private DataSourceProcessorManager dataSourcePluginManager;
private DataSourceProcessorProvider() {
initDataSourceProcessorPlugin();
}
private static class DataSourceClientProviderHolder {
private static final DataSourceProcessorProvider INSTANCE = new DataSourceProcessorProvider();
}
public static DataSourceProcessorProvider getInstance() {
return DataSourceClientProviderHolder.INSTANCE;
}
public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
return dataSourcePluginManager.getDataSourceProcessorMap();
}
private void initDataSourceProcessorPlugin() {
dataSourcePluginManager = new DataSourceProcessorManager();
dataSourcePluginManager.installProcessor();
}
}

69
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java

@ -17,44 +17,26 @@
package org.apache.dolphinscheduler.plugin.datasource.api.utils;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.clickhouse.ClickHouseDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.db2.Db2DataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.hive.HiveDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.mysql.MySQLDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.oracle.OracleDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.postgresql.PostgreSQLDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.presto.PrestoDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift.RedshiftDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.spark.SparkDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.sqlserver.SQLServerDataSourceProcessor;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.Map;
public class DataSourceUtils {
private DataSourceUtils() {
public DataSourceUtils() {
}
private static final Logger logger = LoggerFactory.getLogger(DataSourceUtils.class);
private static final DataSourceProcessor mysqlProcessor = new MySQLDataSourceProcessor();
private static final DataSourceProcessor postgreSqlProcessor = new PostgreSQLDataSourceProcessor();
private static final DataSourceProcessor hiveProcessor = new HiveDataSourceProcessor();
private static final DataSourceProcessor sparkProcessor = new SparkDataSourceProcessor();
private static final DataSourceProcessor clickhouseProcessor = new ClickHouseDataSourceProcessor();
private static final DataSourceProcessor oracleProcessor = new OracleDataSourceProcessor();
private static final DataSourceProcessor sqlServerProcessor = new SQLServerDataSourceProcessor();
private static final DataSourceProcessor db2PROCESSOR = new Db2DataSourceProcessor();
private static final DataSourceProcessor prestoPROCESSOR = new PrestoDataSourceProcessor();
private static final DataSourceProcessor redshiftProcessor = new RedshiftDataSourceProcessor();
/**
* check datasource param
*
@ -103,30 +85,11 @@ public class DataSourceUtils {
}
public static DataSourceProcessor getDatasourceProcessor(DbType dbType) {
switch (dbType) {
case MYSQL:
return mysqlProcessor;
case POSTGRESQL:
return postgreSqlProcessor;
case HIVE:
return hiveProcessor;
case SPARK:
return sparkProcessor;
case CLICKHOUSE:
return clickhouseProcessor;
case ORACLE:
return oracleProcessor;
case SQLSERVER:
return sqlServerProcessor;
case DB2:
return db2PROCESSOR;
case PRESTO:
return prestoPROCESSOR;
case REDSHIFT:
return redshiftProcessor;
default:
throw new IllegalArgumentException("datasource type illegal:" + dbType);
Map<String, DataSourceProcessor> dataSourceProcessorMap = DataSourceProcessorProvider.getInstance().getDataSourceProcessorMap();
if (!dataSourceProcessorMap.containsKey(dbType.name())) {
throw new IllegalArgumentException("illegal datasource type");
}
return dataSourceProcessorMap.get(dbType.name());
}
/**
@ -135,4 +98,14 @@ public class DataSourceUtils {
public static String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) {
return getDatasourceProcessor(dbType).getDatasourceUniqueId(connectionParam, dbType);
}
/**
* build connection url
*/
public static BaseDataSourceParamDTO buildDatasourceParam(String param) {
JsonNode jsonNodes = JSONUtils.parseObject(param);
return getDatasourceProcessor(DbType.ofName(jsonNodes.get("type").asText().toUpperCase()))
.castDatasourceParamDTO(param);
}
}

11
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/enums/DbType.java

@ -17,13 +17,14 @@
package org.apache.dolphinscheduler.spi.enums;
import static java.util.stream.Collectors.toMap;
import com.baomidou.mybatisplus.annotation.EnumValue;
import com.google.common.base.Functions;
import java.util.Arrays;
import java.util.Map;
import java.util.NoSuchElementException;
import com.baomidou.mybatisplus.annotation.EnumValue;
import com.google.common.base.Functions;
import static java.util.stream.Collectors.toMap;
public enum DbType {
MYSQL(0, "mysql"),
@ -66,6 +67,10 @@ public enum DbType {
return null;
}
public static DbType ofName(String name) {
return Arrays.stream(DbType.values()).filter(e -> e.name().equals(name)).findFirst().orElseThrow(() -> new NoSuchElementException("no such db type"));
}
public boolean isHive() {
return this == DbType.HIVE;
}

Loading…
Cancel
Save