Browse Source

Fix datasource jdbc url param doesn't work (#13098)

(cherry picked from commit ab96a3d0)
3.0.4-release
Wenjun Ruan 2 years ago committed by Jay Chung
parent
commit
750cf62641
  1. 39
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
  2. 14
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java
  3. 29
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java
  4. 30
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java
  5. 59
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java
  6. 29
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java
  7. 28
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java
  8. 34
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java
  9. 22
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java
  10. 25
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java
  11. 22
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java
  12. 21
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java
  13. 9
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/provider/JDBCDataSourceProvider.java
  14. 72
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessorTest.java
  15. 7
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessorTest.java
  16. 9
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessorTest.java
  17. 9
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessorTest.java
  18. 3
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessorTest.java
  19. 12
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessorTest.java
  20. 3
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessorTest.java
  21. 87
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/BaseConnectionParam.java
  22. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/test/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTaskTest.java

39
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java

@ -381,21 +381,30 @@ public class DataSourceServiceTest {
@Test
public void buildParameterWithDecodePassword() {
PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "true");
Map<String, String> other = new HashMap<>();
other.put("autoDeserialize", "yes");
other.put("allowUrlInLocalInfile", "true");
MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO();
mysqlDatasourceParamDTO.setHost("192.168.9.1");
mysqlDatasourceParamDTO.setPort(1521);
mysqlDatasourceParamDTO.setDatabase("im");
mysqlDatasourceParamDTO.setUserName("test");
mysqlDatasourceParamDTO.setPassword("123456");
mysqlDatasourceParamDTO.setOther(other);
ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO);
String expected = "{\"user\":\"test\",\"password\":\"IUAjJCVeJipNVEl6TkRVMg==\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/"
+ "im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select 1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}";
Assert.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
try (MockedStatic<PropertyUtils> mockedStaticPropertyUtils = Mockito.mockStatic(PropertyUtils.class)) {
mockedStaticPropertyUtils
.when(() -> PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, false))
.thenReturn(true);
Map<String, String> other = new HashMap<>();
other.put("autoDeserialize", "yes");
other.put("allowUrlInLocalInfile", "true");
other.put("useSSL", "true");
MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO();
mysqlDatasourceParamDTO.setHost("192.168.9.1");
mysqlDatasourceParamDTO.setPort(1521);
mysqlDatasourceParamDTO.setDatabase("im");
mysqlDatasourceParamDTO.setUserName("test");
mysqlDatasourceParamDTO.setPassword("123456");
mysqlDatasourceParamDTO.setOther(other);
ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO);
String expected =
"{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\","
+
"\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\","
+
"\"validationQuery\":\"select 1\",\"other\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\",\"useSSL\":\"true\"}}";
Assertions.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
}
PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "false");
mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO();

14
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java

@ -17,17 +17,23 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.Map;
import java.util.regex.Pattern;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Sets;
public abstract class AbstractDataSourceProcessor implements DataSourceProcessor {
private static final Pattern IPV4_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.\\,]+$");
@ -82,6 +88,14 @@ public abstract class AbstractDataSourceProcessor implements DataSourceProcessor
}
}
protected Map<String, String> transformOtherParamToMap(String other) {
if (StringUtils.isBlank(other)) {
return Collections.emptyMap();
}
return JSONUtils.parseObject(other, new TypeReference<Map<String, String>>() {
});
}
@Override
public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;

29
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/clickhouse/ClickHouseDataSourceProcessor.java

@ -31,7 +31,8 @@ import org.apache.commons.collections4.MapUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
@ -43,7 +44,7 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
ClickHouseDataSourceParamDTO clickHouseDatasourceParamDTO = new ClickHouseDataSourceParamDTO();
clickHouseDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
clickHouseDatasourceParamDTO.setUserName(connectionParams.getUser());
clickHouseDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
clickHouseDatasourceParamDTO.setOther(connectionParams.getOther());
String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH);
String[] hostPortArray = hostSeperator[hostSeperator.length - 1].split(Constants.COMMA);
@ -67,8 +68,7 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
clickhouseConnectionParam.setPassword(PasswordUtils.encodePassword(clickHouseParam.getPassword()));
clickhouseConnectionParam.setDriverClassName(getDatasourceDriver());
clickhouseConnectionParam.setValidationQuery(getValidationQuery());
clickhouseConnectionParam.setOther(transformOther(clickHouseParam.getOther()));
clickhouseConnectionParam.setProps(clickHouseParam.getOther());
clickhouseConnectionParam.setOther(clickHouseParam.getOther());
return clickhouseConnectionParam;
}
@ -91,8 +91,8 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
public String getJdbcUrl(ConnectionParam connectionParam) {
ClickHouseConnectionParam clickhouseConnectionParam = (ClickHouseConnectionParam) connectionParam;
String jdbcUrl = clickhouseConnectionParam.getJdbcUrl();
if (!StringUtils.isEmpty(clickhouseConnectionParam.getOther())) {
jdbcUrl = String.format("%s?%s", jdbcUrl, clickhouseConnectionParam.getOther());
if (MapUtils.isNotEmpty(clickhouseConnectionParam.getOther())) {
jdbcUrl = String.format("%s?%s", jdbcUrl, transformOther(clickhouseConnectionParam.getOther()));
}
return jdbcUrl;
}
@ -114,20 +114,9 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) {
return null;
}
StringBuilder stringBuilder = new StringBuilder();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s%s", key, value, "&")));
return stringBuilder.toString();
List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return String.join("&", otherList);
}
private Map<String, String> parseOther(String other) {
if (other == null) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split("&");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}

30
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessor.java

@ -32,7 +32,8 @@ import org.apache.commons.collections4.MapUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
@ -43,7 +44,7 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
Db2DataSourceParamDTO db2DatasourceParamDTO = new Db2DataSourceParamDTO();
db2DatasourceParamDTO.setDatabase(connectionParams.getDatabase());
db2DatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
db2DatasourceParamDTO.setOther(connectionParams.getOther());
db2DatasourceParamDTO.setUserName(db2DatasourceParamDTO.getUserName());
String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH);
@ -68,8 +69,7 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
db2ConnectionParam.setPassword(PasswordUtils.encodePassword(db2Param.getPassword()));
db2ConnectionParam.setDriverClassName(getDatasourceDriver());
db2ConnectionParam.setValidationQuery(getValidationQuery());
db2ConnectionParam.setOther(transformOther(db2Param.getOther()));
db2ConnectionParam.setProps(db2Param.getOther());
db2ConnectionParam.setOther(db2Param.getOther());
return db2ConnectionParam;
}
@ -87,8 +87,9 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
Db2ConnectionParam db2ConnectionParam = (Db2ConnectionParam) connectionParam;
if (!StringUtils.isEmpty(db2ConnectionParam.getOther())) {
return String.format("%s;%s", db2ConnectionParam.getJdbcUrl(), db2ConnectionParam.getOther());
if (MapUtils.isNotEmpty(db2ConnectionParam.getOther())) {
return String.format("%s;%s", db2ConnectionParam.getJdbcUrl(),
transformOther(db2ConnectionParam.getOther()));
}
return db2ConnectionParam.getJdbcUrl();
}
@ -115,20 +116,9 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) {
return null;
}
StringBuilder stringBuilder = new StringBuilder();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s%s", key, value, ";")));
stringBuilder.deleteCharAt(stringBuilder.length() - 1);
return stringBuilder.toString();
List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return String.join(";", otherList);
}
private Map<String, String> parseOther(String other) {
if (other == null) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split("&")) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}

59
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/hive/HiveDataSourceProcessor.java

@ -34,7 +34,8 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
@ -46,7 +47,7 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
hiveDataSourceParamDTO.setDatabase(hiveConnectionParam.getDatabase());
hiveDataSourceParamDTO.setUserName(hiveConnectionParam.getUser());
hiveDataSourceParamDTO.setOther(parseOther(hiveConnectionParam.getOther()));
hiveDataSourceParamDTO.setOther(hiveConnectionParam.getOther());
hiveDataSourceParamDTO.setLoginUserKeytabUsername(hiveConnectionParam.getLoginUserKeytabUsername());
hiveDataSourceParamDTO.setLoginUserKeytabPath(hiveConnectionParam.getLoginUserKeytabPath());
hiveDataSourceParamDTO.setJavaSecurityKrb5Conf(hiveConnectionParam.getJavaSecurityKrb5Conf());
@ -90,8 +91,7 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
hiveConnectionParam.setLoginUserKeytabPath(hiveParam.getLoginUserKeytabPath());
hiveConnectionParam.setLoginUserKeytabUsername(hiveParam.getLoginUserKeytabUsername());
}
hiveConnectionParam.setOther(transformOther(hiveParam.getOther()));
hiveConnectionParam.setProps(hiveParam.getOther());
hiveConnectionParam.setOther(hiveParam.getOther());
return hiveConnectionParam;
}
@ -114,11 +114,10 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
public String getJdbcUrl(ConnectionParam connectionParam) {
HiveConnectionParam hiveConnectionParam = (HiveConnectionParam) connectionParam;
String jdbcUrl = hiveConnectionParam.getJdbcUrl();
String otherParams = filterOther(hiveConnectionParam.getOther());
if (StringUtils.isNotEmpty(otherParams) && !"?".equals(otherParams.substring(0, 1))) {
jdbcUrl += ";";
if (MapUtils.isNotEmpty(hiveConnectionParam.getOther())) {
return jdbcUrl + "?" + transformOther(hiveConnectionParam.getOther());
}
return jdbcUrl + otherParams;
return jdbcUrl;
}
@Override
@ -140,47 +139,9 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) {
return null;
}
StringBuilder stringBuilder = new StringBuilder();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s;", key, value)));
return stringBuilder.toString();
List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return String.join(";", otherList);
}
private String filterOther(String otherParams) {
if (StringUtils.isBlank(otherParams)) {
return "";
}
StringBuilder hiveConfListSb = new StringBuilder();
hiveConfListSb.append("?");
StringBuilder sessionVarListSb = new StringBuilder();
String[] otherArray = otherParams.split(";", -1);
for (String conf : otherArray) {
sessionVarListSb.append(conf).append(";");
}
// remove the last ";"
if (sessionVarListSb.length() > 0) {
sessionVarListSb.deleteCharAt(sessionVarListSb.length() - 1);
}
if (hiveConfListSb.length() > 0) {
hiveConfListSb.deleteCharAt(hiveConfListSb.length() - 1);
}
return sessionVarListSb.toString() + hiveConfListSb.toString();
}
private Map<String, String> parseOther(String other) {
if (other == null) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split(";");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}

29
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/mysql/MySQLDataSourceProcessor.java

@ -32,8 +32,9 @@ import org.apache.commons.collections4.MapUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
@ -60,7 +61,7 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
mysqlDatasourceParamDTO.setUserName(connectionParams.getUser());
mysqlDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
mysqlDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
mysqlDatasourceParamDTO.setOther(connectionParams.getOther());
String address = connectionParams.getAddress();
String[] hostSeperator = address.split(Constants.DOUBLE_SLASH);
@ -85,8 +86,7 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
mysqlConnectionParam.setPassword(PasswordUtils.encodePassword(mysqlDatasourceParam.getPassword()));
mysqlConnectionParam.setDriverClassName(getDatasourceDriver());
mysqlConnectionParam.setValidationQuery(getValidationQuery());
mysqlConnectionParam.setOther(transformOther(mysqlDatasourceParam.getOther()));
mysqlConnectionParam.setProps(mysqlDatasourceParam.getOther());
mysqlConnectionParam.setOther(mysqlDatasourceParam.getOther());
return mysqlConnectionParam;
}
@ -110,8 +110,8 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
public String getJdbcUrl(ConnectionParam connectionParam) {
MySQLConnectionParam mysqlConnectionParam = (MySQLConnectionParam) connectionParam;
String jdbcUrl = mysqlConnectionParam.getJdbcUrl();
if (!StringUtils.isEmpty(mysqlConnectionParam.getOther())) {
return String.format("%s?%s&%s", jdbcUrl, mysqlConnectionParam.getOther(), APPEND_PARAMS);
if (MapUtils.isNotEmpty(mysqlConnectionParam.getOther())) {
return String.format("%s?%s&%s", jdbcUrl, transformOther(mysqlConnectionParam.getOther()), APPEND_PARAMS);
}
return String.format("%s?%s", jdbcUrl, APPEND_PARAMS);
}
@ -152,9 +152,9 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) {
return null;
}
StringBuilder stringBuilder = new StringBuilder();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s&", key, value)));
return stringBuilder.toString();
List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return String.join("&", otherList);
}
private static boolean checkKeyIsLegitimate(String key) {
@ -164,15 +164,4 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
&& !key.contains(ALLOW_URL_IN_LOCAL_IN_FILE_NAME);
}
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split("&")) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}

28
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessor.java

@ -34,7 +34,6 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -47,7 +46,7 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
oracleDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
oracleDatasourceParamDTO.setUserName(connectionParams.getUser());
oracleDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
oracleDatasourceParamDTO.setOther(connectionParams.getOther());
String hostSeperator = Constants.DOUBLE_SLASH;
if (DbConnectType.ORACLE_SID.equals(connectionParams.connectType)) {
@ -85,8 +84,7 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
oracleConnectionParam.setConnectType(oracleParam.getConnectType());
oracleConnectionParam.setDriverClassName(getDatasourceDriver());
oracleConnectionParam.setValidationQuery(getValidationQuery());
oracleConnectionParam.setOther(transformOther(oracleParam.getOther()));
oracleConnectionParam.setProps(oracleParam.getOther());
oracleConnectionParam.setOther(oracleParam.getOther());
return oracleConnectionParam;
}
@ -109,8 +107,9 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
OracleConnectionParam oracleConnectionParam = (OracleConnectionParam) connectionParam;
if (!StringUtils.isEmpty(oracleConnectionParam.getOther())) {
return String.format("%s?%s", oracleConnectionParam.getJdbcUrl(), oracleConnectionParam.getOther());
if (MapUtils.isNotEmpty(oracleConnectionParam.getOther())) {
return String.format("%s?%s", oracleConnectionParam.getJdbcUrl(),
transformOther(oracleConnectionParam.getOther()));
}
return oracleConnectionParam.getJdbcUrl();
}
@ -132,20 +131,9 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) {
return null;
}
List<String> list = new ArrayList<>();
otherMap.forEach((key, value) -> list.add(String.format("%s=%s", key, value)));
return String.join("&", list);
List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return String.join("&", otherList);
}
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split("&");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}

34
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessor.java

@ -32,9 +32,14 @@ import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.google.auto.service.AutoService;
import com.google.common.base.Strings;
@AutoService(DataSourceProcessor.class)
public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
@ -43,7 +48,7 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
PostgreSQLDataSourceParamDTO postgreSqlDatasourceParamDTO = new PostgreSQLDataSourceParamDTO();
postgreSqlDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
postgreSqlDatasourceParamDTO.setUserName(connectionParams.getUser());
postgreSqlDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
postgreSqlDatasourceParamDTO.setOther(connectionParams.getOther());
String address = connectionParams.getAddress();
String[] hostSeperator = address.split(Constants.DOUBLE_SLASH);
@ -68,8 +73,7 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
postgreSqlConnectionParam.setPassword(PasswordUtils.encodePassword(postgreSqlParam.getPassword()));
postgreSqlConnectionParam.setDriverClassName(getDatasourceDriver());
postgreSqlConnectionParam.setValidationQuery(getValidationQuery());
postgreSqlConnectionParam.setOther(transformOther(postgreSqlParam.getOther()));
postgreSqlConnectionParam.setProps(postgreSqlParam.getOther());
postgreSqlConnectionParam.setOther(postgreSqlParam.getOther());
return postgreSqlConnectionParam;
}
@ -92,8 +96,9 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
PostgreSQLConnectionParam postgreSqlConnectionParam = (PostgreSQLConnectionParam) connectionParam;
if (!StringUtils.isEmpty(postgreSqlConnectionParam.getOther())) {
return String.format("%s?%s", postgreSqlConnectionParam.getJdbcUrl(), postgreSqlConnectionParam.getOther());
if (MapUtils.isNotEmpty(postgreSqlConnectionParam.getOther())) {
return String.format("%s?%s", postgreSqlConnectionParam.getJdbcUrl(),
transformOther(postgreSqlConnectionParam.getOther()));
}
return postgreSqlConnectionParam.getJdbcUrl();
}
@ -115,20 +120,9 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) {
return null;
}
StringBuilder stringBuilder = new StringBuilder();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s&", key, value)));
return stringBuilder.toString();
List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return String.join("&", otherList);
}
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split("&")) {
String[] split = config.split("=");
otherMap.put(split[0], split[1]);
}
return otherMap;
}
}

22
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessor.java

@ -33,7 +33,6 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -51,7 +50,7 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
prestoDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
prestoDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
prestoDatasourceParamDTO.setUserName(connectionParams.getUser());
prestoDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
prestoDatasourceParamDTO.setOther(connectionParams.getOther());
return prestoDatasourceParamDTO;
}
@ -65,13 +64,12 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
PrestoConnectionParam prestoConnectionParam = new PrestoConnectionParam();
prestoConnectionParam.setUser(prestoParam.getUserName());
prestoConnectionParam.setPassword(PasswordUtils.encodePassword(prestoParam.getPassword()));
prestoConnectionParam.setOther(transformOther(prestoParam.getOther()));
prestoConnectionParam.setOther(prestoParam.getOther());
prestoConnectionParam.setAddress(address);
prestoConnectionParam.setJdbcUrl(jdbcUrl);
prestoConnectionParam.setDatabase(prestoParam.getDatabase());
prestoConnectionParam.setDriverClassName(getDatasourceDriver());
prestoConnectionParam.setValidationQuery(getValidationQuery());
prestoConnectionParam.setProps(prestoParam.getOther());
return prestoConnectionParam;
}
@ -94,8 +92,9 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
PrestoConnectionParam prestoConnectionParam = (PrestoConnectionParam) connectionParam;
if (!StringUtils.isEmpty(prestoConnectionParam.getOther())) {
return String.format("%s?%s", prestoConnectionParam.getJdbcUrl(), prestoConnectionParam.getOther());
if (MapUtils.isNotEmpty(prestoConnectionParam.getOther())) {
return String.format("%s?%s", prestoConnectionParam.getJdbcUrl(),
transformOther(prestoConnectionParam.getOther()));
}
return prestoConnectionParam.getJdbcUrl();
}
@ -122,15 +121,4 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
return null;
}
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split("&");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}

25
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessor.java

@ -33,7 +33,6 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -53,7 +52,7 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
redshiftDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
redshiftDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
redshiftDatasourceParamDTO.setUserName(connectionParams.getUser());
redshiftDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
redshiftDatasourceParamDTO.setOther(connectionParams.getOther());
return redshiftDatasourceParamDTO;
}
@ -68,13 +67,12 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
redshiftConnectionParam = new RedshiftConnectionParam();
redshiftConnectionParam.setUser(redshiftParam.getUserName());
redshiftConnectionParam.setPassword(PasswordUtils.encodePassword(redshiftParam.getPassword()));
redshiftConnectionParam.setOther(transformOther(redshiftParam.getOther()));
redshiftConnectionParam.setOther(redshiftParam.getOther());
redshiftConnectionParam.setAddress(address);
redshiftConnectionParam.setJdbcUrl(jdbcUrl);
redshiftConnectionParam.setDatabase(redshiftParam.getDatabase());
redshiftConnectionParam.setDriverClassName(getDatasourceDriver());
redshiftConnectionParam.setValidationQuery(getValidationQuery());
redshiftConnectionParam.setProps(redshiftParam.getOther());
return redshiftConnectionParam;
}
@ -96,10 +94,10 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
RedshiftConnectionParam
redshiftConnectionParam = (RedshiftConnectionParam) connectionParam;
if (!StringUtils.isEmpty(redshiftConnectionParam.getOther())) {
return String.format("%s?%s", redshiftConnectionParam.getJdbcUrl(), redshiftConnectionParam.getOther());
RedshiftConnectionParam redshiftConnectionParam = (RedshiftConnectionParam) connectionParam;
if (MapUtils.isNotEmpty(redshiftConnectionParam.getOther())) {
return String.format("%s?%s", redshiftConnectionParam.getJdbcUrl(),
transformOther(redshiftConnectionParam.getOther()));
}
return redshiftConnectionParam.getJdbcUrl();
}
@ -126,15 +124,4 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
return null;
}
private Map<String, String> parseOther(String other) {
Map<String, String> otherMap = new LinkedHashMap<>();
if (StringUtils.isEmpty(other)) {
return otherMap;
}
String[] configs = other.split(Constants.SEMICOLON);
for (String config : configs) {
otherMap.put(config.split(Constants.EQUAL_SIGN)[0], config.split(Constants.EQUAL_SIGN)[1]);
}
return otherMap;
}
}

22
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessor.java

@ -35,7 +35,6 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -49,7 +48,7 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
SparkDataSourceParamDTO sparkDatasourceParamDTO = new SparkDataSourceParamDTO();
sparkDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
sparkDatasourceParamDTO.setUserName(connectionParams.getUser());
sparkDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
sparkDatasourceParamDTO.setOther(connectionParams.getOther());
sparkDatasourceParamDTO.setJavaSecurityKrb5Conf(connectionParams.getJavaSecurityKrb5Conf());
sparkDatasourceParamDTO.setLoginUserKeytabPath(connectionParams.getLoginUserKeytabPath());
sparkDatasourceParamDTO.setLoginUserKeytabUsername(connectionParams.getLoginUserKeytabUsername());
@ -81,13 +80,12 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
SparkConnectionParam sparkConnectionParam = new SparkConnectionParam();
sparkConnectionParam.setPassword(PasswordUtils.encodePassword(sparkDatasourceParam.getPassword()));
sparkConnectionParam.setUser(sparkDatasourceParam.getUserName());
sparkConnectionParam.setOther(transformOther(sparkDatasourceParam.getOther()));
sparkConnectionParam.setOther(sparkDatasourceParam.getOther());
sparkConnectionParam.setDatabase(sparkDatasourceParam.getDatabase());
sparkConnectionParam.setAddress(address.toString());
sparkConnectionParam.setJdbcUrl(jdbcUrl);
sparkConnectionParam.setDriverClassName(getDatasourceDriver());
sparkConnectionParam.setValidationQuery(getValidationQuery());
sparkConnectionParam.setProps(sparkDatasourceParam.getOther());
if (CommonUtils.getKerberosStartupState()) {
sparkConnectionParam.setPrincipal(sparkDatasourceParam.getPrincipal());
@ -117,8 +115,9 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
SparkConnectionParam sparkConnectionParam = (SparkConnectionParam) connectionParam;
if (!StringUtils.isEmpty(sparkConnectionParam.getOther())) {
return String.format("%s;%s", sparkConnectionParam.getJdbcUrl(), sparkConnectionParam.getOther());
if (MapUtils.isNotEmpty(sparkConnectionParam.getOther())) {
return String.format("%s;%s", sparkConnectionParam.getJdbcUrl(),
transformOther(sparkConnectionParam.getOther()));
}
return sparkConnectionParam.getJdbcUrl();
}
@ -147,15 +146,4 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
return String.join(";", stringBuilder);
}
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split(";");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}

21
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/sqlserver/SQLServerDataSourceProcessor.java

@ -32,7 +32,6 @@ import org.apache.commons.lang.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;
public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
@ -46,7 +45,7 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
SQLServerDataSourceParamDTO sqlServerDatasourceParamDTO = new SQLServerDataSourceParamDTO();
sqlServerDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
sqlServerDatasourceParamDTO.setUserName(connectionParams.getUser());
sqlServerDatasourceParamDTO.setOther(parseOther(connectionParams.getOther()));
sqlServerDatasourceParamDTO.setOther(connectionParams.getOther());
sqlServerDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1]));
sqlServerDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
return sqlServerDatasourceParamDTO;
@ -62,12 +61,11 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
sqlServerConnectionParam.setAddress(address);
sqlServerConnectionParam.setDatabase(sqlServerParam.getDatabase());
sqlServerConnectionParam.setJdbcUrl(jdbcUrl);
sqlServerConnectionParam.setOther(transformOther(sqlServerParam.getOther()));
sqlServerConnectionParam.setOther(sqlServerParam.getOther());
sqlServerConnectionParam.setUser(sqlServerParam.getUserName());
sqlServerConnectionParam.setPassword(PasswordUtils.encodePassword(sqlServerParam.getPassword()));
sqlServerConnectionParam.setDriverClassName(getDatasourceDriver());
sqlServerConnectionParam.setValidationQuery(getValidationQuery());
sqlServerConnectionParam.setProps(sqlServerParam.getOther());
return sqlServerConnectionParam;
}
@ -90,8 +88,9 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
public String getJdbcUrl(ConnectionParam connectionParam) {
SQLServerConnectionParam sqlServerConnectionParam = (SQLServerConnectionParam) connectionParam;
if (!StringUtils.isEmpty(sqlServerConnectionParam.getOther())) {
return String.format("%s;%s", sqlServerConnectionParam.getJdbcUrl(), sqlServerConnectionParam.getOther());
if (MapUtils.isNotEmpty(sqlServerConnectionParam.getOther())) {
return String.format("%s;%s", sqlServerConnectionParam.getJdbcUrl(),
transformOther(sqlServerConnectionParam.getOther()));
}
return sqlServerConnectionParam.getJdbcUrl();
}
@ -118,14 +117,4 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
return stringBuilder.toString();
}
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split(";")) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
}

9
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/provider/JDBCDataSourceProvider.java

@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.utils.Constants;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.MapUtils;
import java.sql.Driver;
@ -56,8 +57,8 @@ public class JDBCDataSourceProvider {
dataSource.setMaximumPoolSize(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE, 50));
dataSource.setConnectionTestQuery(properties.getValidationQuery());
if (properties.getProps() != null) {
properties.getProps().forEach(dataSource::addDataSourceProperty);
if (MapUtils.isNotEmpty(properties.getOther())) {
properties.getOther().forEach(dataSource::addDataSourceProperty);
}
logger.info("Creating HikariDataSource pool success.");
@ -82,8 +83,8 @@ public class JDBCDataSourceProvider {
dataSource.setMaximumPoolSize(isOneSession ? 1 : PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE, 50));
dataSource.setConnectionTestQuery(properties.getValidationQuery());
if (properties.getProps() != null) {
properties.getProps().forEach(dataSource::addDataSourceProperty);
if (MapUtils.isNotEmpty(properties.getOther())) {
properties.getOther().forEach(dataSource::addDataSourceProperty);
}
logger.info("Creating OneSession HikariDataSource pool success.");

72
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessorTest.java

@ -17,10 +17,15 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
@ -43,4 +48,71 @@ public class AbstractDataSourceProcessorTest {
other.put("arg0", "%");
doThrow(new IllegalArgumentException()).when(mockDataSourceProcessor).checkOther(other);
}
@Test
public void shouldNotIncludeMaliciousParams() {
AbstractDataSourceProcessor mockDataSourceProcessor = mock(AbstractDataSourceProcessor.class);
Map<String, String> other = new HashMap<>();
other.put("allowLoadLocalInfile", "whatever");
doThrow(new IllegalArgumentException()).when(mockDataSourceProcessor).checkOther(other);
}
@org.junit.Test
public void transformOtherParamToMap() {
AbstractDataSourceProcessor abstractDataSourceProcessor = new AbstractDataSourceProcessor() {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return null;
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
return null;
}
@Override
public ConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
return null;
}
@Override
public ConnectionParam createConnectionParams(String connectionJson) {
return null;
}
@Override
public String getDatasourceDriver() {
return null;
}
@Override
public String getValidationQuery() {
return null;
}
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
return null;
}
@Override
public Connection getConnection(ConnectionParam connectionParam) {
return null;
}
@Override
public DbType getDbType() {
return null;
}
@Override
public DataSourceProcessor create() {
return null;
}
};
String other = "{\"useSSL\": \"true\"}";
Map<String, String> stringStringMap = abstractDataSourceProcessor.transformOtherParamToMap(other);
assertEquals("true", stringStringMap.get("useSSL"));
}
}

7
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/db2/Db2DataSourceProcessorTest.java

@ -36,6 +36,8 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.ImmutableMap;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Class.class, DriverManager.class, DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class})
public class Db2DataSourceProcessorTest {
@ -83,7 +85,10 @@ public class Db2DataSourceProcessorTest {
public void testGetJdbcUrl() {
Db2ConnectionParam db2ConnectionParam = new Db2ConnectionParam();
db2ConnectionParam.setJdbcUrl("jdbc:db2://localhost:5142/default");
db2ConnectionParam.setOther("other=other");
ImmutableMap<String, String> map = new ImmutableMap.Builder<String, String>()
.put("other", "other")
.build();
db2ConnectionParam.setOther(map);
String jdbcUrl = db2DatasourceProcessor.getJdbcUrl(db2ConnectionParam);
Assert.assertEquals("jdbc:db2://localhost:5142/default;other=other", jdbcUrl);
}

9
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/oracle/OracleDataSourceProcessorTest.java

@ -37,6 +37,8 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.ImmutableMap;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Class.class, DriverManager.class, DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class})
public class OracleDataSourceProcessorTest {
@ -83,8 +85,11 @@ public class OracleDataSourceProcessorTest {
public void testGetJdbcUrl() {
OracleConnectionParam oracleConnectionParam = new OracleConnectionParam();
oracleConnectionParam.setJdbcUrl("jdbc:oracle:thin:@localhost:3308:default");
oracleConnectionParam.setOther("other=other");
Assert.assertEquals("jdbc:oracle:thin:@localhost:3308:default?other=other",
ImmutableMap<String, String> map = new ImmutableMap.Builder<String, String>()
.put("other", "other")
.build();
oracleConnectionParam.setOther(map);
Assertions.assertEquals("jdbc:oracle:thin:@localhost:3308:default?other=other",
oracleDatasourceProcessor.getJdbcUrl(oracleConnectionParam));
}

9
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/postgresql/PostgreSQLDataSourceProcessorTest.java

@ -36,6 +36,8 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.ImmutableMap;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Class.class, DriverManager.class, DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class})
public class PostgreSQLDataSourceProcessorTest {
@ -81,10 +83,13 @@ public class PostgreSQLDataSourceProcessorTest {
public void testGetJdbcUrl() {
PostgreSQLConnectionParam postgreSqlConnectionParam = new PostgreSQLConnectionParam();
postgreSqlConnectionParam.setJdbcUrl("jdbc:postgresql://localhost:3308/default");
postgreSqlConnectionParam.setOther("other");
ImmutableMap<String, String> map = new ImmutableMap.Builder<String, String>()
.put("other", "other")
.build();
postgreSqlConnectionParam.setOther(map);
String jdbcUrl = postgreSqlDatasourceProcessor.getJdbcUrl(postgreSqlConnectionParam);
Assert.assertEquals("jdbc:postgresql://localhost:3308/default?other", jdbcUrl);
Assertions.assertEquals("jdbc:postgresql://localhost:3308/default?other=other", jdbcUrl);
}

3
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/presto/PrestoDataSourceProcessorTest.java

@ -80,8 +80,7 @@ public class PrestoDataSourceProcessorTest {
public void testGetJdbcUrl() {
PrestoConnectionParam prestoConnectionParam = new PrestoConnectionParam();
prestoConnectionParam.setJdbcUrl("jdbc:postgresql://localhost:1234/default");
prestoConnectionParam.setOther("other");
Assert.assertEquals("jdbc:postgresql://localhost:1234/default?other",
Assertions.assertEquals("jdbc:postgresql://localhost:1234/default",
prestoDatasourceProcessor.getJdbcUrl(prestoConnectionParam));
}

12
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/redshift/RedshiftDataSourceProcessorTest.java

@ -36,6 +36,8 @@ import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import com.google.common.collect.ImmutableMap;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Class.class, DriverManager.class, DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class})
public class RedshiftDataSourceProcessorTest {
@ -80,9 +82,13 @@ public class RedshiftDataSourceProcessorTest {
public void testGetJdbcUrl() {
RedshiftConnectionParam redshiftConnectionParam = new RedshiftConnectionParam();
redshiftConnectionParam.setJdbcUrl("jdbc:redshift://localhost:5439/default");
redshiftConnectionParam.setOther("DSILogLevel=6;defaultRowFetchSize=100");
Assert.assertEquals("jdbc:redshift://localhost:5439/default?DSILogLevel=6;defaultRowFetchSize=100",
redshiftDatasourceProcessor.getJdbcUrl(redshiftConnectionParam));
ImmutableMap<String, String> map = new ImmutableMap.Builder<String, String>()
.put("DSILogLevel", "6")
.put("defaultRowFetchSize", "100")
.build();
redshiftConnectionParam.setOther(map);
Assertions.assertEquals("jdbc:redshift://localhost:5439/default?DSILogLevel=6;defaultRowFetchSize=100",
redshiftDatasourceProcessor.getJdbcUrl(redshiftConnectionParam));
}

3
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/spark/SparkDataSourceProcessorTest.java

@ -82,8 +82,7 @@ public class SparkDataSourceProcessorTest {
public void testGetJdbcUrl() {
SparkConnectionParam sparkConnectionParam = new SparkConnectionParam();
sparkConnectionParam.setJdbcUrl("jdbc:hive2://localhost1:1234,localhost2:1234/default");
sparkConnectionParam.setOther("other");
Assert.assertEquals("jdbc:hive2://localhost1:1234,localhost2:1234/default;other",
Assertions.assertEquals("jdbc:hive2://localhost1:1234,localhost2:1234/default",
sparkDatasourceProcessor.getJdbcUrl(sparkConnectionParam));
}

87
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/BaseConnectionParam.java

@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.spi.datasource;
import java.util.HashMap;
import java.util.Map;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
@Data
@JsonInclude(Include.NON_NULL)
public abstract class BaseConnectionParam implements ConnectionParam {
@ -42,87 +44,6 @@ public abstract class BaseConnectionParam implements ConnectionParam {
protected String validationQuery;
protected String other;
private Map<String, String> props = new HashMap<>();
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getJdbcUrl() {
return jdbcUrl;
}
public void setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}
public String getDriverLocation() {
return driverLocation;
}
public void setDriverLocation(String driverLocation) {
this.driverLocation = driverLocation;
}
public String getDriverClassName() {
return driverClassName;
}
public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
}
public String getValidationQuery() {
return validationQuery;
}
public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
}
public String getOther() {
return other;
}
public void setOther(String other) {
this.other = other;
}
public Map<String, String> getProps() {
return props;
}
protected Map<String, String> other;
public void setProps(Map<String, String> props) {
this.props = props;
}
}

10
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/test/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTaskTest.java

@ -494,7 +494,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\","
+ "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}");
+ "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\" : \"UTF-8\", \"allowMultiQueries\": true}}");
dataQualityTaskExecutionContext.setStatisticsValueConnectorType("JDBC");
dataQualityTaskExecutionContext.setStatisticsValueType(1);
@ -505,7 +505,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\","
+ "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}");
+ "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\" : \"UTF-8\", \"allowMultiQueries\": true}}");
dataQualityTaskExecutionContext.setCompareWithFixedValue(true);
@ -791,7 +791,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\","
+ "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}");
+ "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\": \"UTF-8\", \"allowMultiQueries\": true}}");
String expect = "{\"name\":\"跨表值比对\",\"env\":{\"type\":\"batch\",\"config\":null},\"readers\""
+ ":[{\"type\":\"JDBC\",\"config\":{\"database\":\"test\",\"password\":\"test\",\"driver\":"
@ -1115,7 +1115,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\","
+ "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}");
+ "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\": \"UTF-8\", \"allowMultiQueries\": true}}");
dataQualityTaskExecutionContext.setStatisticsValueConnectorType("JDBC");
dataQualityTaskExecutionContext.setStatisticsValueType(1);
@ -1126,7 +1126,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\","
+ "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}");
+ "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\": \"UTF-8\", \"allowMultiQueries\": true }}");
dataQualityTaskExecutionContext.setRuleName("跨表准确性");
dataQualityTaskExecutionContext.setRuleType(RuleType.MULTI_TABLE_ACCURACY.getCode());

Loading…
Cancel
Save