Browse Source

Fix datasource jdbc url param doesn't work (#13098)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
ab96a3d0c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
  2. 12
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java
  3. 9
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/provider/JDBCDataSourceProvider.java
  4. 64
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessorTest.java
  5. 24
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-athena/src/main/java/org/apache/dolphinscheduler/plugin/datasource/athena/param/AthenaDataSourceProcessor.java
  6. 13
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-athena/src/test/java/org/apache/dolphinscheduler/plugin/datasource/athena/param/AthenaDataSourceProcessorTest.java
  7. 30
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-clickhouse/src/main/java/org/apache/dolphinscheduler/plugin/datasource/clickhouse/param/ClickHouseDataSourceProcessor.java
  8. 31
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-db2/src/main/java/org/apache/dolphinscheduler/plugin/datasource/db2/param/Db2DataSourceProcessor.java
  9. 7
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-db2/src/test/java/org/apache/dolphinscheduler/plugin/datasource/db2/param/Db2DataSourceProcessorTest.java
  10. 60
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/param/HiveDataSourceProcessor.java
  11. 30
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-mysql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/mysql/param/MySQLDataSourceProcessor.java
  12. 29
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-oracle/src/main/java/org/apache/dolphinscheduler/plugin/datasource/oracle/param/OracleDataSourceProcessor.java
  13. 7
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-oracle/src/test/java/org/apache/dolphinscheduler/plugin/datasource/oracle/param/OracleDataSourceProcessorTest.java
  14. 31
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-postgresql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/postgresql/param/PostgreSQLDataSourceProcessor.java
  15. 9
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-postgresql/src/test/java/org/apache/dolphinscheduler/plugin/datasource/postgresql/param/PostgreSQLDataSourceProcessorTest.java
  16. 23
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-presto/src/main/java/org/apache/dolphinscheduler/plugin/datasource/presto/param/PrestoDataSourceProcessor.java
  17. 3
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-presto/src/test/java/org/apache/dolphinscheduler/plugin/datasource/presto/param/PrestoDataSourceProcessorTest.java
  18. 23
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/param/RedshiftDataSourceProcessor.java
  19. 8
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/test/java/org/apache/dolphinscheduler/plugin/datasource/redshift/param/RedshiftDataSourceProcessorTest.java
  20. 23
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-spark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/spark/param/SparkDataSourceProcessor.java
  21. 3
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-spark/src/test/java/org/apache/dolphinscheduler/plugin/datasource/spark/param/SparkDataSourceProcessorTest.java
  22. 22
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sqlserver/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sqlserver/param/SQLServerDataSourceProcessor.java
  23. 3
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sqlserver/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sqlserver/param/SQLServerDataSourceProcessorTest.java
  24. 23
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-trino/src/main/java/org/apache/dolphinscheduler/plugin/datasource/trino/param/TrinoDataSourceProcessor.java
  25. 3
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-trino/src/test/java/org/apache/dolphinscheduler/plugin/datasource/trino/param/TrinoDataSourceProcessorTest.java
  26. 87
      dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/BaseConnectionParam.java
  27. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/test/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTaskTest.java

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java

@ -428,6 +428,7 @@ public class DataSourceServiceTest {
Map<String, String> other = new HashMap<>(); Map<String, String> other = new HashMap<>();
other.put("autoDeserialize", "yes"); other.put("autoDeserialize", "yes");
other.put("allowUrlInLocalInfile", "true"); other.put("allowUrlInLocalInfile", "true");
other.put("useSSL", "true");
MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO(); MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO();
mysqlDatasourceParamDTO.setHost("192.168.9.1"); mysqlDatasourceParamDTO.setHost("192.168.9.1");
mysqlDatasourceParamDTO.setPort(1521); mysqlDatasourceParamDTO.setPort(1521);
@ -437,8 +438,11 @@ public class DataSourceServiceTest {
mysqlDatasourceParamDTO.setOther(other); mysqlDatasourceParamDTO.setOther(other);
ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO); ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO);
String expected = String expected =
"{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/" "{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\","
+ "im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select 1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}"; +
"\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\","
+
"\"validationQuery\":\"select 1\",\"other\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\",\"useSSL\":\"true\"}}";
Assertions.assertEquals(expected, JSONUtils.toJsonString(connectionParam)); Assertions.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
} }

12
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessor.java

@ -17,18 +17,22 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource; package org.apache.dolphinscheduler.plugin.datasource.api.datasource;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
public abstract class AbstractDataSourceProcessor implements DataSourceProcessor { public abstract class AbstractDataSourceProcessor implements DataSourceProcessor {
@ -90,6 +94,14 @@ public abstract class AbstractDataSourceProcessor implements DataSourceProcessor
} }
} }
protected Map<String, String> transformOtherParamToMap(String other) {
if (StringUtils.isBlank(other)) {
return Collections.emptyMap();
}
return JSONUtils.parseObject(other, new TypeReference<Map<String, String>>() {
});
}
@Override @Override
public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) { public String getDatasourceUniqueId(ConnectionParam connectionParam, DbType dbType) {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam; BaseConnectionParam baseConnectionParam = (BaseConnectionParam) connectionParam;

9
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/provider/JDBCDataSourceProvider.java

@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import java.sql.Driver; import java.sql.Driver;
@ -59,8 +60,8 @@ public class JDBCDataSourceProvider {
dataSource.setMaximumPoolSize(PropertyUtils.getInt(DataSourceConstants.SPRING_DATASOURCE_MAX_ACTIVE, 50)); dataSource.setMaximumPoolSize(PropertyUtils.getInt(DataSourceConstants.SPRING_DATASOURCE_MAX_ACTIVE, 50));
dataSource.setConnectionTestQuery(properties.getValidationQuery()); dataSource.setConnectionTestQuery(properties.getValidationQuery());
if (properties.getProps() != null) { if (MapUtils.isNotEmpty(properties.getOther())) {
properties.getProps().forEach(dataSource::addDataSourceProperty); properties.getOther().forEach(dataSource::addDataSourceProperty);
} }
logger.info("Creating HikariDataSource pool success."); logger.info("Creating HikariDataSource pool success.");
@ -88,8 +89,8 @@ public class JDBCDataSourceProvider {
isOneSession ? 1 : PropertyUtils.getInt(DataSourceConstants.SPRING_DATASOURCE_MAX_ACTIVE, 50)); isOneSession ? 1 : PropertyUtils.getInt(DataSourceConstants.SPRING_DATASOURCE_MAX_ACTIVE, 50));
dataSource.setConnectionTestQuery(properties.getValidationQuery()); dataSource.setConnectionTestQuery(properties.getValidationQuery());
if (properties.getProps() != null) { if (MapUtils.isNotEmpty(properties.getOther())) {
properties.getProps().forEach(dataSource::addDataSourceProperty); properties.getOther().forEach(dataSource::addDataSourceProperty);
} }
logger.info("Creating OneSession HikariDataSource pool success."); logger.info("Creating OneSession HikariDataSource pool success.");

64
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/AbstractDataSourceProcessorTest.java

@ -17,10 +17,15 @@
package org.apache.dolphinscheduler.plugin.datasource.api.datasource; package org.apache.dolphinscheduler.plugin.datasource.api.datasource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -51,4 +56,63 @@ public class AbstractDataSourceProcessorTest {
other.put("allowLoadLocalInfile", "whatever"); other.put("allowLoadLocalInfile", "whatever");
doThrow(new IllegalArgumentException()).when(mockDataSourceProcessor).checkOther(other); doThrow(new IllegalArgumentException()).when(mockDataSourceProcessor).checkOther(other);
} }
@org.junit.Test
public void transformOtherParamToMap() {
AbstractDataSourceProcessor abstractDataSourceProcessor = new AbstractDataSourceProcessor() {
@Override
public BaseDataSourceParamDTO castDatasourceParamDTO(String paramJson) {
return null;
}
@Override
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) {
return null;
}
@Override
public ConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) {
return null;
}
@Override
public ConnectionParam createConnectionParams(String connectionJson) {
return null;
}
@Override
public String getDatasourceDriver() {
return null;
}
@Override
public String getValidationQuery() {
return null;
}
@Override
public String getJdbcUrl(ConnectionParam connectionParam) {
return null;
}
@Override
public Connection getConnection(ConnectionParam connectionParam) {
return null;
}
@Override
public DbType getDbType() {
return null;
}
@Override
public DataSourceProcessor create() {
return null;
}
};
String other = "{\"useSSL\": \"true\"}";
Map<String, String> stringStringMap = abstractDataSourceProcessor.transformOtherParamToMap(other);
assertEquals("true", stringStringMap.get("useSSL"));
}
} }

24
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-athena/src/main/java/org/apache/dolphinscheduler/plugin/datasource/athena/param/AthenaDataSourceProcessor.java

@ -29,13 +29,11 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -57,7 +55,7 @@ public class AthenaDataSourceProcessor extends AbstractDataSourceProcessor {
athenaDatasourceParamDTO.setAwsRegion(connectionParams.getAwsRegion()); athenaDatasourceParamDTO.setAwsRegion(connectionParams.getAwsRegion());
athenaDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); athenaDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
athenaDatasourceParamDTO.setUserName(connectionParams.getUser()); athenaDatasourceParamDTO.setUserName(connectionParams.getUser());
athenaDatasourceParamDTO.setOther(this.parseOther(connectionParams.getOther())); athenaDatasourceParamDTO.setOther(connectionParams.getOther());
return athenaDatasourceParamDTO; return athenaDatasourceParamDTO;
} }
@ -72,13 +70,12 @@ public class AthenaDataSourceProcessor extends AbstractDataSourceProcessor {
athenaConnectionParam.setUser(athenaParam.getUserName()); athenaConnectionParam.setUser(athenaParam.getUserName());
athenaConnectionParam.setPassword(PasswordUtils.encodePassword(athenaParam.getPassword())); athenaConnectionParam.setPassword(PasswordUtils.encodePassword(athenaParam.getPassword()));
athenaConnectionParam.setAwsRegion(athenaParam.getAwsRegion()); athenaConnectionParam.setAwsRegion(athenaParam.getAwsRegion());
athenaConnectionParam.setOther(this.transformOther(athenaParam.getOther())); athenaConnectionParam.setOther(athenaParam.getOther());
athenaConnectionParam.setAddress(address); athenaConnectionParam.setAddress(address);
athenaConnectionParam.setJdbcUrl(address); athenaConnectionParam.setJdbcUrl(address);
athenaConnectionParam.setDatabase(athenaParam.getDatabase()); athenaConnectionParam.setDatabase(athenaParam.getDatabase());
athenaConnectionParam.setDriverClassName(this.getDatasourceDriver()); athenaConnectionParam.setDriverClassName(this.getDatasourceDriver());
athenaConnectionParam.setValidationQuery(this.getValidationQuery()); athenaConnectionParam.setValidationQuery(this.getValidationQuery());
athenaConnectionParam.setProps(athenaParam.getOther());
return athenaConnectionParam; return athenaConnectionParam;
} }
@ -101,8 +98,9 @@ public class AthenaDataSourceProcessor extends AbstractDataSourceProcessor {
@Override @Override
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
AthenaConnectionParam athenaConnectionParam = (AthenaConnectionParam) connectionParam; AthenaConnectionParam athenaConnectionParam = (AthenaConnectionParam) connectionParam;
if (!StringUtils.isEmpty(athenaConnectionParam.getOther())) { if (MapUtils.isNotEmpty(athenaConnectionParam.getOther())) {
return String.format("%s%s", athenaConnectionParam.getJdbcUrl(), athenaConnectionParam.getOther()); return String.format("%s;%s", athenaConnectionParam.getJdbcUrl(),
transformOther(athenaConnectionParam.getOther()));
} }
return athenaConnectionParam.getJdbcUrl(); return athenaConnectionParam.getJdbcUrl();
} }
@ -134,18 +132,6 @@ public class AthenaDataSourceProcessor extends AbstractDataSourceProcessor {
return null; return null;
} }
private Map<String, String> parseOther(String other) {
Map<String, String> otherMap = new LinkedHashMap<>();
if (StringUtils.isEmpty(other)) {
return otherMap;
}
String[] configs = other.split(Constants.SEMICOLON);
for (String config : configs) {
otherMap.put(config.split(Constants.EQUAL_SIGN)[0], config.split(Constants.EQUAL_SIGN)[1]);
}
return otherMap;
}
@Override @Override
protected void checkHost(String host) { protected void checkHost(String host) {
// Do not need to set the host, nothing to do // Do not need to set the host, nothing to do

13
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-athena/src/test/java/org/apache/dolphinscheduler/plugin/datasource/athena/param/AthenaDataSourceProcessorTest.java

@ -31,6 +31,8 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.google.common.collect.ImmutableMap;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class AthenaDataSourceProcessorTest { public class AthenaDataSourceProcessorTest {
@ -76,9 +78,13 @@ public class AthenaDataSourceProcessorTest {
@Test @Test
public void testGetJdbcUrl() { public void testGetJdbcUrl() {
AthenaConnectionParam athenaConnectionParam = new AthenaConnectionParam(); AthenaConnectionParam athenaConnectionParam = new AthenaConnectionParam();
athenaConnectionParam.setJdbcUrl("jdbc:awsathena://AwsRegion=cn-north-1;"); athenaConnectionParam.setJdbcUrl("jdbc:awsathena://AwsRegion=cn-north-1");
athenaConnectionParam.setOther("LogLevel=6;LogPath=/tmp;"); ImmutableMap<String, String> map = new ImmutableMap.Builder<String, String>()
Assertions.assertEquals("jdbc:awsathena://AwsRegion=cn-north-1;LogLevel=6;LogPath=/tmp;", .put("LogLevel", "6")
.put("LogPath", "/tmp")
.build();
athenaConnectionParam.setOther(map);
Assertions.assertEquals("jdbc:awsathena://AwsRegion=cn-north-1;LogLevel=6;LogPath=/tmp",
this.athenaDataSourceProcessor.getJdbcUrl(athenaConnectionParam)); this.athenaDataSourceProcessor.getJdbcUrl(athenaConnectionParam));
} }
@ -87,7 +93,6 @@ public class AthenaDataSourceProcessorTest {
public void testGetJdbcUrlNoOther() { public void testGetJdbcUrlNoOther() {
AthenaConnectionParam athenaConnectionParam = new AthenaConnectionParam(); AthenaConnectionParam athenaConnectionParam = new AthenaConnectionParam();
athenaConnectionParam.setJdbcUrl("jdbc:awsathena://AwsRegion=cn-north-1;"); athenaConnectionParam.setJdbcUrl("jdbc:awsathena://AwsRegion=cn-north-1;");
athenaConnectionParam.setOther("");
Assertions.assertEquals("jdbc:awsathena://AwsRegion=cn-north-1;", Assertions.assertEquals("jdbc:awsathena://AwsRegion=cn-north-1;",
this.athenaDataSourceProcessor.getJdbcUrl(athenaConnectionParam)); this.athenaDataSourceProcessor.getJdbcUrl(athenaConnectionParam));

30
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-clickhouse/src/main/java/org/apache/dolphinscheduler/plugin/datasource/clickhouse/param/ClickHouseDataSourceProcessor.java

@ -28,12 +28,12 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.LinkedHashMap; import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
@ -53,7 +53,7 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
ClickHouseDataSourceParamDTO clickHouseDatasourceParamDTO = new ClickHouseDataSourceParamDTO(); ClickHouseDataSourceParamDTO clickHouseDatasourceParamDTO = new ClickHouseDataSourceParamDTO();
clickHouseDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); clickHouseDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
clickHouseDatasourceParamDTO.setUserName(connectionParams.getUser()); clickHouseDatasourceParamDTO.setUserName(connectionParams.getUser());
clickHouseDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); clickHouseDatasourceParamDTO.setOther(connectionParams.getOther());
String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH); String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH);
String[] hostPortArray = hostSeperator[hostSeperator.length - 1].split(Constants.COMMA); String[] hostPortArray = hostSeperator[hostSeperator.length - 1].split(Constants.COMMA);
@ -78,8 +78,7 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
clickhouseConnectionParam.setPassword(PasswordUtils.encodePassword(clickHouseParam.getPassword())); clickhouseConnectionParam.setPassword(PasswordUtils.encodePassword(clickHouseParam.getPassword()));
clickhouseConnectionParam.setDriverClassName(getDatasourceDriver()); clickhouseConnectionParam.setDriverClassName(getDatasourceDriver());
clickhouseConnectionParam.setValidationQuery(getValidationQuery()); clickhouseConnectionParam.setValidationQuery(getValidationQuery());
clickhouseConnectionParam.setOther(transformOther(clickHouseParam.getOther())); clickhouseConnectionParam.setOther(clickHouseParam.getOther());
clickhouseConnectionParam.setProps(clickHouseParam.getOther());
return clickhouseConnectionParam; return clickhouseConnectionParam;
} }
@ -102,8 +101,8 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
ClickHouseConnectionParam clickhouseConnectionParam = (ClickHouseConnectionParam) connectionParam; ClickHouseConnectionParam clickhouseConnectionParam = (ClickHouseConnectionParam) connectionParam;
String jdbcUrl = clickhouseConnectionParam.getJdbcUrl(); String jdbcUrl = clickhouseConnectionParam.getJdbcUrl();
if (!StringUtils.isEmpty(clickhouseConnectionParam.getOther())) { if (MapUtils.isNotEmpty(clickhouseConnectionParam.getOther())) {
jdbcUrl = String.format("%s?%s", jdbcUrl, clickhouseConnectionParam.getOther()); jdbcUrl = String.format("%s?%s", jdbcUrl, transformOther(clickhouseConnectionParam.getOther()));
} }
return jdbcUrl; return jdbcUrl;
} }
@ -131,20 +130,9 @@ public class ClickHouseDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) { if (MapUtils.isEmpty(otherMap)) {
return null; return null;
} }
StringBuilder stringBuilder = new StringBuilder(); List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s%s", key, value, "&"))); otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return stringBuilder.toString(); return String.join("&", otherList);
} }
private Map<String, String> parseOther(String other) {
if (other == null) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split("&");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
} }

31
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-db2/src/main/java/org/apache/dolphinscheduler/plugin/datasource/db2/param/Db2DataSourceProcessor.java

@ -29,12 +29,12 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.LinkedHashMap; import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
@ -53,7 +53,7 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
Db2DataSourceParamDTO db2DatasourceParamDTO = new Db2DataSourceParamDTO(); Db2DataSourceParamDTO db2DatasourceParamDTO = new Db2DataSourceParamDTO();
db2DatasourceParamDTO.setDatabase(connectionParams.getDatabase()); db2DatasourceParamDTO.setDatabase(connectionParams.getDatabase());
db2DatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); db2DatasourceParamDTO.setOther(connectionParams.getOther());
db2DatasourceParamDTO.setUserName(db2DatasourceParamDTO.getUserName()); db2DatasourceParamDTO.setUserName(db2DatasourceParamDTO.getUserName());
String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH); String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH);
@ -78,8 +78,7 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
db2ConnectionParam.setPassword(PasswordUtils.encodePassword(db2Param.getPassword())); db2ConnectionParam.setPassword(PasswordUtils.encodePassword(db2Param.getPassword()));
db2ConnectionParam.setDriverClassName(getDatasourceDriver()); db2ConnectionParam.setDriverClassName(getDatasourceDriver());
db2ConnectionParam.setValidationQuery(getValidationQuery()); db2ConnectionParam.setValidationQuery(getValidationQuery());
db2ConnectionParam.setOther(transformOther(db2Param.getOther())); db2ConnectionParam.setOther(db2Param.getOther());
db2ConnectionParam.setProps(db2Param.getOther());
return db2ConnectionParam; return db2ConnectionParam;
} }
@ -97,8 +96,9 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
@Override @Override
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
Db2ConnectionParam db2ConnectionParam = (Db2ConnectionParam) connectionParam; Db2ConnectionParam db2ConnectionParam = (Db2ConnectionParam) connectionParam;
if (!StringUtils.isEmpty(db2ConnectionParam.getOther())) { if (MapUtils.isNotEmpty(db2ConnectionParam.getOther())) {
return String.format("%s;%s", db2ConnectionParam.getJdbcUrl(), db2ConnectionParam.getOther()); return String.format("%s;%s", db2ConnectionParam.getJdbcUrl(),
transformOther(db2ConnectionParam.getOther()));
} }
return db2ConnectionParam.getJdbcUrl(); return db2ConnectionParam.getJdbcUrl();
} }
@ -130,20 +130,9 @@ public class Db2DataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) { if (MapUtils.isEmpty(otherMap)) {
return null; return null;
} }
StringBuilder stringBuilder = new StringBuilder(); List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s%s", key, value, ";"))); otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
stringBuilder.deleteCharAt(stringBuilder.length() - 1); return String.join(";", otherList);
return stringBuilder.toString();
} }
private Map<String, String> parseOther(String other) {
if (other == null) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split("&")) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
} }

7
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-db2/src/test/java/org/apache/dolphinscheduler/plugin/datasource/db2/param/Db2DataSourceProcessorTest.java

@ -31,6 +31,8 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.google.common.collect.ImmutableMap;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class Db2DataSourceProcessorTest { public class Db2DataSourceProcessorTest {
@ -78,7 +80,10 @@ public class Db2DataSourceProcessorTest {
public void testGetJdbcUrl() { public void testGetJdbcUrl() {
Db2ConnectionParam db2ConnectionParam = new Db2ConnectionParam(); Db2ConnectionParam db2ConnectionParam = new Db2ConnectionParam();
db2ConnectionParam.setJdbcUrl("jdbc:db2://localhost:5142/default"); db2ConnectionParam.setJdbcUrl("jdbc:db2://localhost:5142/default");
db2ConnectionParam.setOther("other=other"); ImmutableMap<String, String> map = new ImmutableMap.Builder<String, String>()
.put("other", "other")
.build();
db2ConnectionParam.setOther(map);
String jdbcUrl = db2DatasourceProcessor.getJdbcUrl(db2ConnectionParam); String jdbcUrl = db2DatasourceProcessor.getJdbcUrl(db2ConnectionParam);
Assertions.assertEquals("jdbc:db2://localhost:5142/default;other=other", jdbcUrl); Assertions.assertEquals("jdbc:db2://localhost:5142/default;other=other", jdbcUrl);
} }

60
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-hive/src/main/java/org/apache/dolphinscheduler/plugin/datasource/hive/param/HiveDataSourceProcessor.java

@ -30,13 +30,13 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.LinkedHashMap; import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
@ -56,7 +56,7 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
hiveDataSourceParamDTO.setDatabase(hiveConnectionParam.getDatabase()); hiveDataSourceParamDTO.setDatabase(hiveConnectionParam.getDatabase());
hiveDataSourceParamDTO.setUserName(hiveConnectionParam.getUser()); hiveDataSourceParamDTO.setUserName(hiveConnectionParam.getUser());
hiveDataSourceParamDTO.setOther(parseOther(hiveConnectionParam.getOther())); hiveDataSourceParamDTO.setOther(hiveConnectionParam.getOther());
hiveDataSourceParamDTO.setLoginUserKeytabUsername(hiveConnectionParam.getLoginUserKeytabUsername()); hiveDataSourceParamDTO.setLoginUserKeytabUsername(hiveConnectionParam.getLoginUserKeytabUsername());
hiveDataSourceParamDTO.setLoginUserKeytabPath(hiveConnectionParam.getLoginUserKeytabPath()); hiveDataSourceParamDTO.setLoginUserKeytabPath(hiveConnectionParam.getLoginUserKeytabPath());
hiveDataSourceParamDTO.setJavaSecurityKrb5Conf(hiveConnectionParam.getJavaSecurityKrb5Conf()); hiveDataSourceParamDTO.setJavaSecurityKrb5Conf(hiveConnectionParam.getJavaSecurityKrb5Conf());
@ -100,8 +100,7 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
hiveConnectionParam.setLoginUserKeytabPath(hiveParam.getLoginUserKeytabPath()); hiveConnectionParam.setLoginUserKeytabPath(hiveParam.getLoginUserKeytabPath());
hiveConnectionParam.setLoginUserKeytabUsername(hiveParam.getLoginUserKeytabUsername()); hiveConnectionParam.setLoginUserKeytabUsername(hiveParam.getLoginUserKeytabUsername());
} }
hiveConnectionParam.setOther(transformOther(hiveParam.getOther())); hiveConnectionParam.setOther(hiveParam.getOther());
hiveConnectionParam.setProps(hiveParam.getOther());
return hiveConnectionParam; return hiveConnectionParam;
} }
@ -124,11 +123,10 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
HiveConnectionParam hiveConnectionParam = (HiveConnectionParam) connectionParam; HiveConnectionParam hiveConnectionParam = (HiveConnectionParam) connectionParam;
String jdbcUrl = hiveConnectionParam.getJdbcUrl(); String jdbcUrl = hiveConnectionParam.getJdbcUrl();
String otherParams = filterOther(hiveConnectionParam.getOther()); if (MapUtils.isNotEmpty(hiveConnectionParam.getOther())) {
if (StringUtils.isNotEmpty(otherParams) && !"?".equals(otherParams.substring(0, 1))) { return jdbcUrl + "?" + transformOther(hiveConnectionParam.getOther());
jdbcUrl += ";";
} }
return jdbcUrl + otherParams; return jdbcUrl;
} }
@Override @Override
@ -155,47 +153,9 @@ public class HiveDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) { if (MapUtils.isEmpty(otherMap)) {
return null; return null;
} }
StringBuilder stringBuilder = new StringBuilder(); List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s;", key, value))); otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return stringBuilder.toString(); return String.join(";", otherList);
} }
private String filterOther(String otherParams) {
if (StringUtils.isBlank(otherParams)) {
return "";
}
StringBuilder hiveConfListSb = new StringBuilder();
hiveConfListSb.append("?");
StringBuilder sessionVarListSb = new StringBuilder();
String[] otherArray = otherParams.split(";", -1);
for (String conf : otherArray) {
sessionVarListSb.append(conf).append(";");
}
// remove the last ";"
if (sessionVarListSb.length() > 0) {
sessionVarListSb.deleteCharAt(sessionVarListSb.length() - 1);
}
if (hiveConfListSb.length() > 0) {
hiveConfListSb.deleteCharAt(hiveConfListSb.length() - 1);
}
return sessionVarListSb.toString() + hiveConfListSb.toString();
}
private Map<String, String> parseOther(String other) {
if (other == null) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split(";");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
} }

30
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-mysql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/mysql/param/MySQLDataSourceProcessor.java

@ -29,13 +29,13 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.List;
import java.util.Map; import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -71,7 +71,7 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
mysqlDatasourceParamDTO.setUserName(connectionParams.getUser()); mysqlDatasourceParamDTO.setUserName(connectionParams.getUser());
mysqlDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); mysqlDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
mysqlDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); mysqlDatasourceParamDTO.setOther(connectionParams.getOther());
String address = connectionParams.getAddress(); String address = connectionParams.getAddress();
String[] hostSeperator = address.split(Constants.DOUBLE_SLASH); String[] hostSeperator = address.split(Constants.DOUBLE_SLASH);
@ -97,8 +97,7 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
mysqlConnectionParam.setPassword(PasswordUtils.encodePassword(mysqlDatasourceParam.getPassword())); mysqlConnectionParam.setPassword(PasswordUtils.encodePassword(mysqlDatasourceParam.getPassword()));
mysqlConnectionParam.setDriverClassName(getDatasourceDriver()); mysqlConnectionParam.setDriverClassName(getDatasourceDriver());
mysqlConnectionParam.setValidationQuery(getValidationQuery()); mysqlConnectionParam.setValidationQuery(getValidationQuery());
mysqlConnectionParam.setOther(transformOther(mysqlDatasourceParam.getOther())); mysqlConnectionParam.setOther(mysqlDatasourceParam.getOther());
mysqlConnectionParam.setProps(mysqlDatasourceParam.getOther());
return mysqlConnectionParam; return mysqlConnectionParam;
} }
@ -122,8 +121,8 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
MySQLConnectionParam mysqlConnectionParam = (MySQLConnectionParam) connectionParam; MySQLConnectionParam mysqlConnectionParam = (MySQLConnectionParam) connectionParam;
String jdbcUrl = mysqlConnectionParam.getJdbcUrl(); String jdbcUrl = mysqlConnectionParam.getJdbcUrl();
if (!StringUtils.isEmpty(mysqlConnectionParam.getOther())) { if (MapUtils.isNotEmpty(mysqlConnectionParam.getOther())) {
return String.format("%s?%s&%s", jdbcUrl, mysqlConnectionParam.getOther(), APPEND_PARAMS); return String.format("%s?%s&%s", jdbcUrl, transformOther(mysqlConnectionParam.getOther()), APPEND_PARAMS);
} }
return String.format("%s?%s", jdbcUrl, APPEND_PARAMS); return String.format("%s?%s", jdbcUrl, APPEND_PARAMS);
} }
@ -169,9 +168,9 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) { if (MapUtils.isEmpty(otherMap)) {
return null; return null;
} }
StringBuilder stringBuilder = new StringBuilder(); List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s&", key, value))); otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return stringBuilder.toString(); return String.join("&", otherList);
} }
private static boolean checkKeyIsLegitimate(String key) { private static boolean checkKeyIsLegitimate(String key) {
@ -181,15 +180,4 @@ public class MySQLDataSourceProcessor extends AbstractDataSourceProcessor {
&& !key.contains(ALLOW_URL_IN_LOCAL_IN_FILE_NAME); && !key.contains(ALLOW_URL_IN_LOCAL_IN_FILE_NAME);
} }
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split("&")) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
} }

29
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-oracle/src/main/java/org/apache/dolphinscheduler/plugin/datasource/oracle/param/OracleDataSourceProcessor.java

@ -30,13 +30,11 @@ import org.apache.dolphinscheduler.spi.enums.DbConnectType;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -57,7 +55,7 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
oracleDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); oracleDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
oracleDatasourceParamDTO.setUserName(connectionParams.getUser()); oracleDatasourceParamDTO.setUserName(connectionParams.getUser());
oracleDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); oracleDatasourceParamDTO.setOther(connectionParams.getOther());
String hostSeperator = Constants.DOUBLE_SLASH; String hostSeperator = Constants.DOUBLE_SLASH;
if (DbConnectType.ORACLE_SID.equals(connectionParams.connectType)) { if (DbConnectType.ORACLE_SID.equals(connectionParams.connectType)) {
@ -95,8 +93,7 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
oracleConnectionParam.setConnectType(oracleParam.getConnectType()); oracleConnectionParam.setConnectType(oracleParam.getConnectType());
oracleConnectionParam.setDriverClassName(getDatasourceDriver()); oracleConnectionParam.setDriverClassName(getDatasourceDriver());
oracleConnectionParam.setValidationQuery(getValidationQuery()); oracleConnectionParam.setValidationQuery(getValidationQuery());
oracleConnectionParam.setOther(transformOther(oracleParam.getOther())); oracleConnectionParam.setOther(oracleParam.getOther());
oracleConnectionParam.setProps(oracleParam.getOther());
return oracleConnectionParam; return oracleConnectionParam;
} }
@ -119,8 +116,9 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
@Override @Override
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
OracleConnectionParam oracleConnectionParam = (OracleConnectionParam) connectionParam; OracleConnectionParam oracleConnectionParam = (OracleConnectionParam) connectionParam;
if (!StringUtils.isEmpty(oracleConnectionParam.getOther())) { if (MapUtils.isNotEmpty(oracleConnectionParam.getOther())) {
return String.format("%s?%s", oracleConnectionParam.getJdbcUrl(), oracleConnectionParam.getOther()); return String.format("%s?%s", oracleConnectionParam.getJdbcUrl(),
transformOther(oracleConnectionParam.getOther()));
} }
return oracleConnectionParam.getJdbcUrl(); return oracleConnectionParam.getJdbcUrl();
} }
@ -147,20 +145,9 @@ public class OracleDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) { if (MapUtils.isEmpty(otherMap)) {
return null; return null;
} }
List<String> list = new ArrayList<>(); List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> list.add(String.format("%s=%s", key, value))); otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return String.join("&", list); return String.join("&", otherList);
} }
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split("&");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
} }

7
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-oracle/src/test/java/org/apache/dolphinscheduler/plugin/datasource/oracle/param/OracleDataSourceProcessorTest.java

@ -32,6 +32,8 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.google.common.collect.ImmutableMap;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class OracleDataSourceProcessorTest { public class OracleDataSourceProcessorTest {
@ -81,7 +83,10 @@ public class OracleDataSourceProcessorTest {
public void testGetJdbcUrl() { public void testGetJdbcUrl() {
OracleConnectionParam oracleConnectionParam = new OracleConnectionParam(); OracleConnectionParam oracleConnectionParam = new OracleConnectionParam();
oracleConnectionParam.setJdbcUrl("jdbc:oracle:thin:@localhost:3308:default"); oracleConnectionParam.setJdbcUrl("jdbc:oracle:thin:@localhost:3308:default");
oracleConnectionParam.setOther("other=other"); ImmutableMap<String, String> map = new ImmutableMap.Builder<String, String>()
.put("other", "other")
.build();
oracleConnectionParam.setOther(map);
Assertions.assertEquals("jdbc:oracle:thin:@localhost:3308:default?other=other", Assertions.assertEquals("jdbc:oracle:thin:@localhost:3308:default?other=other",
oracleDatasourceProcessor.getJdbcUrl(oracleConnectionParam)); oracleDatasourceProcessor.getJdbcUrl(oracleConnectionParam));
} }

31
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-postgresql/src/main/java/org/apache/dolphinscheduler/plugin/datasource/postgresql/param/PostgreSQLDataSourceProcessor.java

@ -33,11 +33,11 @@ import org.apache.commons.collections4.MapUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.LinkedHashMap; import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import com.google.common.base.Strings;
@AutoService(DataSourceProcessor.class) @AutoService(DataSourceProcessor.class)
public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor { public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
@ -53,7 +53,7 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
PostgreSQLDataSourceParamDTO postgreSqlDatasourceParamDTO = new PostgreSQLDataSourceParamDTO(); PostgreSQLDataSourceParamDTO postgreSqlDatasourceParamDTO = new PostgreSQLDataSourceParamDTO();
postgreSqlDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); postgreSqlDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
postgreSqlDatasourceParamDTO.setUserName(connectionParams.getUser()); postgreSqlDatasourceParamDTO.setUserName(connectionParams.getUser());
postgreSqlDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); postgreSqlDatasourceParamDTO.setOther(connectionParams.getOther());
String address = connectionParams.getAddress(); String address = connectionParams.getAddress();
String[] hostSeperator = address.split(Constants.DOUBLE_SLASH); String[] hostSeperator = address.split(Constants.DOUBLE_SLASH);
@ -79,8 +79,7 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
postgreSqlConnectionParam.setPassword(PasswordUtils.encodePassword(postgreSqlParam.getPassword())); postgreSqlConnectionParam.setPassword(PasswordUtils.encodePassword(postgreSqlParam.getPassword()));
postgreSqlConnectionParam.setDriverClassName(getDatasourceDriver()); postgreSqlConnectionParam.setDriverClassName(getDatasourceDriver());
postgreSqlConnectionParam.setValidationQuery(getValidationQuery()); postgreSqlConnectionParam.setValidationQuery(getValidationQuery());
postgreSqlConnectionParam.setOther(transformOther(postgreSqlParam.getOther())); postgreSqlConnectionParam.setOther(postgreSqlParam.getOther());
postgreSqlConnectionParam.setProps(postgreSqlParam.getOther());
return postgreSqlConnectionParam; return postgreSqlConnectionParam;
} }
@ -103,8 +102,9 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
@Override @Override
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
PostgreSQLConnectionParam postgreSqlConnectionParam = (PostgreSQLConnectionParam) connectionParam; PostgreSQLConnectionParam postgreSqlConnectionParam = (PostgreSQLConnectionParam) connectionParam;
if (!Strings.isNullOrEmpty(postgreSqlConnectionParam.getOther())) { if (MapUtils.isNotEmpty(postgreSqlConnectionParam.getOther())) {
return String.format("%s?%s", postgreSqlConnectionParam.getJdbcUrl(), postgreSqlConnectionParam.getOther()); return String.format("%s?%s", postgreSqlConnectionParam.getJdbcUrl(),
transformOther(postgreSqlConnectionParam.getOther()));
} }
return postgreSqlConnectionParam.getJdbcUrl(); return postgreSqlConnectionParam.getJdbcUrl();
} }
@ -132,20 +132,9 @@ public class PostgreSQLDataSourceProcessor extends AbstractDataSourceProcessor {
if (MapUtils.isEmpty(otherMap)) { if (MapUtils.isEmpty(otherMap)) {
return null; return null;
} }
StringBuilder stringBuilder = new StringBuilder(); List<String> otherList = new ArrayList<>();
otherMap.forEach((key, value) -> stringBuilder.append(String.format("%s=%s&", key, value))); otherMap.forEach((key, value) -> otherList.add(String.format("%s=%s", key, value)));
return stringBuilder.toString(); return String.join("&", otherList);
} }
private Map<String, String> parseOther(String other) {
if (Strings.isNullOrEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split("&")) {
String[] split = config.split("=");
otherMap.put(split[0], split[1]);
}
return otherMap;
}
} }

9
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-postgresql/src/test/java/org/apache/dolphinscheduler/plugin/datasource/postgresql/param/PostgreSQLDataSourceProcessorTest.java

@ -33,6 +33,8 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.google.common.collect.ImmutableMap;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class PostgreSQLDataSourceProcessorTest { public class PostgreSQLDataSourceProcessorTest {
@ -79,10 +81,13 @@ public class PostgreSQLDataSourceProcessorTest {
public void testGetJdbcUrl() { public void testGetJdbcUrl() {
PostgreSQLConnectionParam postgreSqlConnectionParam = new PostgreSQLConnectionParam(); PostgreSQLConnectionParam postgreSqlConnectionParam = new PostgreSQLConnectionParam();
postgreSqlConnectionParam.setJdbcUrl("jdbc:postgresql://localhost:3308/default"); postgreSqlConnectionParam.setJdbcUrl("jdbc:postgresql://localhost:3308/default");
postgreSqlConnectionParam.setOther("other"); ImmutableMap<String, String> map = new ImmutableMap.Builder<String, String>()
.put("other", "other")
.build();
postgreSqlConnectionParam.setOther(map);
String jdbcUrl = postgreSqlDatasourceProcessor.getJdbcUrl(postgreSqlConnectionParam); String jdbcUrl = postgreSqlDatasourceProcessor.getJdbcUrl(postgreSqlConnectionParam);
Assertions.assertEquals("jdbc:postgresql://localhost:3308/default?other", jdbcUrl); Assertions.assertEquals("jdbc:postgresql://localhost:3308/default?other=other", jdbcUrl);
} }

23
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-presto/src/main/java/org/apache/dolphinscheduler/plugin/datasource/presto/param/PrestoDataSourceProcessor.java

@ -29,13 +29,11 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -61,7 +59,7 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
prestoDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]); prestoDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
prestoDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); prestoDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
prestoDatasourceParamDTO.setUserName(connectionParams.getUser()); prestoDatasourceParamDTO.setUserName(connectionParams.getUser());
prestoDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); prestoDatasourceParamDTO.setOther(connectionParams.getOther());
return prestoDatasourceParamDTO; return prestoDatasourceParamDTO;
} }
@ -76,13 +74,12 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
PrestoConnectionParam prestoConnectionParam = new PrestoConnectionParam(); PrestoConnectionParam prestoConnectionParam = new PrestoConnectionParam();
prestoConnectionParam.setUser(prestoParam.getUserName()); prestoConnectionParam.setUser(prestoParam.getUserName());
prestoConnectionParam.setPassword(PasswordUtils.encodePassword(prestoParam.getPassword())); prestoConnectionParam.setPassword(PasswordUtils.encodePassword(prestoParam.getPassword()));
prestoConnectionParam.setOther(transformOther(prestoParam.getOther())); prestoConnectionParam.setOther(prestoParam.getOther());
prestoConnectionParam.setAddress(address); prestoConnectionParam.setAddress(address);
prestoConnectionParam.setJdbcUrl(jdbcUrl); prestoConnectionParam.setJdbcUrl(jdbcUrl);
prestoConnectionParam.setDatabase(prestoParam.getDatabase()); prestoConnectionParam.setDatabase(prestoParam.getDatabase());
prestoConnectionParam.setDriverClassName(getDatasourceDriver()); prestoConnectionParam.setDriverClassName(getDatasourceDriver());
prestoConnectionParam.setValidationQuery(getValidationQuery()); prestoConnectionParam.setValidationQuery(getValidationQuery());
prestoConnectionParam.setProps(prestoParam.getOther());
return prestoConnectionParam; return prestoConnectionParam;
} }
@ -105,8 +102,9 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
@Override @Override
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
PrestoConnectionParam prestoConnectionParam = (PrestoConnectionParam) connectionParam; PrestoConnectionParam prestoConnectionParam = (PrestoConnectionParam) connectionParam;
if (!StringUtils.isEmpty(prestoConnectionParam.getOther())) { if (MapUtils.isNotEmpty(prestoConnectionParam.getOther())) {
return String.format("%s?%s", prestoConnectionParam.getJdbcUrl(), prestoConnectionParam.getOther()); return String.format("%s?%s", prestoConnectionParam.getJdbcUrl(),
transformOther(prestoConnectionParam.getOther()));
} }
return prestoConnectionParam.getJdbcUrl(); return prestoConnectionParam.getJdbcUrl();
} }
@ -138,15 +136,4 @@ public class PrestoDataSourceProcessor extends AbstractDataSourceProcessor {
return null; return null;
} }
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split("&");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
} }

3
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-presto/src/test/java/org/apache/dolphinscheduler/plugin/datasource/presto/param/PrestoDataSourceProcessorTest.java

@ -77,8 +77,7 @@ public class PrestoDataSourceProcessorTest {
public void testGetJdbcUrl() { public void testGetJdbcUrl() {
PrestoConnectionParam prestoConnectionParam = new PrestoConnectionParam(); PrestoConnectionParam prestoConnectionParam = new PrestoConnectionParam();
prestoConnectionParam.setJdbcUrl("jdbc:postgresql://localhost:1234/default"); prestoConnectionParam.setJdbcUrl("jdbc:postgresql://localhost:1234/default");
prestoConnectionParam.setOther("other"); Assertions.assertEquals("jdbc:postgresql://localhost:1234/default",
Assertions.assertEquals("jdbc:postgresql://localhost:1234/default?other",
prestoDatasourceProcessor.getJdbcUrl(prestoConnectionParam)); prestoDatasourceProcessor.getJdbcUrl(prestoConnectionParam));
} }

23
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/main/java/org/apache/dolphinscheduler/plugin/datasource/redshift/param/RedshiftDataSourceProcessor.java

@ -29,13 +29,11 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -61,7 +59,7 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
redshiftDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]); redshiftDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
redshiftDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); redshiftDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
redshiftDatasourceParamDTO.setUserName(connectionParams.getUser()); redshiftDatasourceParamDTO.setUserName(connectionParams.getUser());
redshiftDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); redshiftDatasourceParamDTO.setOther(connectionParams.getOther());
return redshiftDatasourceParamDTO; return redshiftDatasourceParamDTO;
} }
@ -77,13 +75,12 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
RedshiftConnectionParam redshiftConnectionParam = new RedshiftConnectionParam(); RedshiftConnectionParam redshiftConnectionParam = new RedshiftConnectionParam();
redshiftConnectionParam.setUser(redshiftParam.getUserName()); redshiftConnectionParam.setUser(redshiftParam.getUserName());
redshiftConnectionParam.setPassword(PasswordUtils.encodePassword(redshiftParam.getPassword())); redshiftConnectionParam.setPassword(PasswordUtils.encodePassword(redshiftParam.getPassword()));
redshiftConnectionParam.setOther(transformOther(redshiftParam.getOther())); redshiftConnectionParam.setOther(redshiftParam.getOther());
redshiftConnectionParam.setAddress(address); redshiftConnectionParam.setAddress(address);
redshiftConnectionParam.setJdbcUrl(jdbcUrl); redshiftConnectionParam.setJdbcUrl(jdbcUrl);
redshiftConnectionParam.setDatabase(redshiftParam.getDatabase()); redshiftConnectionParam.setDatabase(redshiftParam.getDatabase());
redshiftConnectionParam.setDriverClassName(getDatasourceDriver()); redshiftConnectionParam.setDriverClassName(getDatasourceDriver());
redshiftConnectionParam.setValidationQuery(getValidationQuery()); redshiftConnectionParam.setValidationQuery(getValidationQuery());
redshiftConnectionParam.setProps(redshiftParam.getOther());
return redshiftConnectionParam; return redshiftConnectionParam;
} }
@ -106,8 +103,9 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
@Override @Override
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
RedshiftConnectionParam redshiftConnectionParam = (RedshiftConnectionParam) connectionParam; RedshiftConnectionParam redshiftConnectionParam = (RedshiftConnectionParam) connectionParam;
if (!StringUtils.isEmpty(redshiftConnectionParam.getOther())) { if (MapUtils.isNotEmpty(redshiftConnectionParam.getOther())) {
return String.format("%s?%s", redshiftConnectionParam.getJdbcUrl(), redshiftConnectionParam.getOther()); return String.format("%s?%s", redshiftConnectionParam.getJdbcUrl(),
transformOther(redshiftConnectionParam.getOther()));
} }
return redshiftConnectionParam.getJdbcUrl(); return redshiftConnectionParam.getJdbcUrl();
} }
@ -139,15 +137,4 @@ public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor {
return null; return null;
} }
private Map<String, String> parseOther(String other) {
Map<String, String> otherMap = new LinkedHashMap<>();
if (StringUtils.isEmpty(other)) {
return otherMap;
}
String[] configs = other.split(Constants.SEMICOLON);
for (String config : configs) {
otherMap.put(config.split(Constants.EQUAL_SIGN)[0], config.split(Constants.EQUAL_SIGN)[1]);
}
return otherMap;
}
} }

8
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-redshift/src/test/java/org/apache/dolphinscheduler/plugin/datasource/redshift/param/RedshiftDataSourceProcessorTest.java

@ -31,6 +31,8 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.google.common.collect.ImmutableMap;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class RedshiftDataSourceProcessorTest { public class RedshiftDataSourceProcessorTest {
@ -78,7 +80,11 @@ public class RedshiftDataSourceProcessorTest {
public void testGetJdbcUrl() { public void testGetJdbcUrl() {
RedshiftConnectionParam redshiftConnectionParam = new RedshiftConnectionParam(); RedshiftConnectionParam redshiftConnectionParam = new RedshiftConnectionParam();
redshiftConnectionParam.setJdbcUrl("jdbc:redshift://localhost:5439/default"); redshiftConnectionParam.setJdbcUrl("jdbc:redshift://localhost:5439/default");
redshiftConnectionParam.setOther("DSILogLevel=6;defaultRowFetchSize=100"); ImmutableMap<String, String> map = new ImmutableMap.Builder<String, String>()
.put("DSILogLevel", "6")
.put("defaultRowFetchSize", "100")
.build();
redshiftConnectionParam.setOther(map);
Assertions.assertEquals("jdbc:redshift://localhost:5439/default?DSILogLevel=6;defaultRowFetchSize=100", Assertions.assertEquals("jdbc:redshift://localhost:5439/default?DSILogLevel=6;defaultRowFetchSize=100",
redshiftDatasourceProcessor.getJdbcUrl(redshiftConnectionParam)); redshiftDatasourceProcessor.getJdbcUrl(redshiftConnectionParam));

23
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-spark/src/main/java/org/apache/dolphinscheduler/plugin/datasource/spark/param/SparkDataSourceProcessor.java

@ -30,14 +30,12 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -59,7 +57,7 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
SparkDataSourceParamDTO sparkDatasourceParamDTO = new SparkDataSourceParamDTO(); SparkDataSourceParamDTO sparkDatasourceParamDTO = new SparkDataSourceParamDTO();
sparkDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); sparkDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
sparkDatasourceParamDTO.setUserName(connectionParams.getUser()); sparkDatasourceParamDTO.setUserName(connectionParams.getUser());
sparkDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); sparkDatasourceParamDTO.setOther(connectionParams.getOther());
sparkDatasourceParamDTO.setJavaSecurityKrb5Conf(connectionParams.getJavaSecurityKrb5Conf()); sparkDatasourceParamDTO.setJavaSecurityKrb5Conf(connectionParams.getJavaSecurityKrb5Conf());
sparkDatasourceParamDTO.setLoginUserKeytabPath(connectionParams.getLoginUserKeytabPath()); sparkDatasourceParamDTO.setLoginUserKeytabPath(connectionParams.getLoginUserKeytabPath());
sparkDatasourceParamDTO.setLoginUserKeytabUsername(connectionParams.getLoginUserKeytabUsername()); sparkDatasourceParamDTO.setLoginUserKeytabUsername(connectionParams.getLoginUserKeytabUsername());
@ -92,13 +90,12 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
SparkConnectionParam sparkConnectionParam = new SparkConnectionParam(); SparkConnectionParam sparkConnectionParam = new SparkConnectionParam();
sparkConnectionParam.setPassword(PasswordUtils.encodePassword(sparkDatasourceParam.getPassword())); sparkConnectionParam.setPassword(PasswordUtils.encodePassword(sparkDatasourceParam.getPassword()));
sparkConnectionParam.setUser(sparkDatasourceParam.getUserName()); sparkConnectionParam.setUser(sparkDatasourceParam.getUserName());
sparkConnectionParam.setOther(transformOther(sparkDatasourceParam.getOther())); sparkConnectionParam.setOther(sparkDatasourceParam.getOther());
sparkConnectionParam.setDatabase(sparkDatasourceParam.getDatabase()); sparkConnectionParam.setDatabase(sparkDatasourceParam.getDatabase());
sparkConnectionParam.setAddress(address.toString()); sparkConnectionParam.setAddress(address.toString());
sparkConnectionParam.setJdbcUrl(jdbcUrl); sparkConnectionParam.setJdbcUrl(jdbcUrl);
sparkConnectionParam.setDriverClassName(getDatasourceDriver()); sparkConnectionParam.setDriverClassName(getDatasourceDriver());
sparkConnectionParam.setValidationQuery(getValidationQuery()); sparkConnectionParam.setValidationQuery(getValidationQuery());
sparkConnectionParam.setProps(sparkDatasourceParam.getOther());
if (CommonUtils.getKerberosStartupState()) { if (CommonUtils.getKerberosStartupState()) {
sparkConnectionParam.setPrincipal(sparkDatasourceParam.getPrincipal()); sparkConnectionParam.setPrincipal(sparkDatasourceParam.getPrincipal());
@ -128,8 +125,9 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
@Override @Override
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
SparkConnectionParam sparkConnectionParam = (SparkConnectionParam) connectionParam; SparkConnectionParam sparkConnectionParam = (SparkConnectionParam) connectionParam;
if (!StringUtils.isEmpty(sparkConnectionParam.getOther())) { if (MapUtils.isNotEmpty(sparkConnectionParam.getOther())) {
return String.format("%s;%s", sparkConnectionParam.getJdbcUrl(), sparkConnectionParam.getOther()); return String.format("%s;%s", sparkConnectionParam.getJdbcUrl(),
transformOther(sparkConnectionParam.getOther()));
} }
return sparkConnectionParam.getJdbcUrl(); return sparkConnectionParam.getJdbcUrl();
} }
@ -163,15 +161,4 @@ public class SparkDataSourceProcessor extends AbstractDataSourceProcessor {
return String.join(";", stringBuilder); return String.join(";", stringBuilder);
} }
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split(";");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
} }

3
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-spark/src/test/java/org/apache/dolphinscheduler/plugin/datasource/spark/param/SparkDataSourceProcessorTest.java

@ -83,8 +83,7 @@ public class SparkDataSourceProcessorTest {
public void testGetJdbcUrl() { public void testGetJdbcUrl() {
SparkConnectionParam sparkConnectionParam = new SparkConnectionParam(); SparkConnectionParam sparkConnectionParam = new SparkConnectionParam();
sparkConnectionParam.setJdbcUrl("jdbc:hive2://localhost1:1234,localhost2:1234/default"); sparkConnectionParam.setJdbcUrl("jdbc:hive2://localhost1:1234,localhost2:1234/default");
sparkConnectionParam.setOther("other"); Assertions.assertEquals("jdbc:hive2://localhost1:1234,localhost2:1234/default",
Assertions.assertEquals("jdbc:hive2://localhost1:1234,localhost2:1234/default;other",
sparkDatasourceProcessor.getJdbcUrl(sparkConnectionParam)); sparkDatasourceProcessor.getJdbcUrl(sparkConnectionParam));
} }

22
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sqlserver/src/main/java/org/apache/dolphinscheduler/plugin/datasource/sqlserver/param/SQLServerDataSourceProcessor.java

@ -29,12 +29,10 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
@ -56,7 +54,7 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
SQLServerDataSourceParamDTO sqlServerDatasourceParamDTO = new SQLServerDataSourceParamDTO(); SQLServerDataSourceParamDTO sqlServerDatasourceParamDTO = new SQLServerDataSourceParamDTO();
sqlServerDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); sqlServerDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
sqlServerDatasourceParamDTO.setUserName(connectionParams.getUser()); sqlServerDatasourceParamDTO.setUserName(connectionParams.getUser());
sqlServerDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); sqlServerDatasourceParamDTO.setOther(connectionParams.getOther());
sqlServerDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1])); sqlServerDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1]));
sqlServerDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]); sqlServerDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]);
return sqlServerDatasourceParamDTO; return sqlServerDatasourceParamDTO;
@ -74,12 +72,11 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
sqlServerConnectionParam.setAddress(address); sqlServerConnectionParam.setAddress(address);
sqlServerConnectionParam.setDatabase(sqlServerParam.getDatabase()); sqlServerConnectionParam.setDatabase(sqlServerParam.getDatabase());
sqlServerConnectionParam.setJdbcUrl(jdbcUrl); sqlServerConnectionParam.setJdbcUrl(jdbcUrl);
sqlServerConnectionParam.setOther(transformOther(sqlServerParam.getOther())); sqlServerConnectionParam.setOther(sqlServerParam.getOther());
sqlServerConnectionParam.setUser(sqlServerParam.getUserName()); sqlServerConnectionParam.setUser(sqlServerParam.getUserName());
sqlServerConnectionParam.setPassword(PasswordUtils.encodePassword(sqlServerParam.getPassword())); sqlServerConnectionParam.setPassword(PasswordUtils.encodePassword(sqlServerParam.getPassword()));
sqlServerConnectionParam.setDriverClassName(getDatasourceDriver()); sqlServerConnectionParam.setDriverClassName(getDatasourceDriver());
sqlServerConnectionParam.setValidationQuery(getValidationQuery()); sqlServerConnectionParam.setValidationQuery(getValidationQuery());
sqlServerConnectionParam.setProps(sqlServerParam.getOther());
return sqlServerConnectionParam; return sqlServerConnectionParam;
} }
@ -102,8 +99,9 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
SQLServerConnectionParam sqlServerConnectionParam = (SQLServerConnectionParam) connectionParam; SQLServerConnectionParam sqlServerConnectionParam = (SQLServerConnectionParam) connectionParam;
if (!StringUtils.isEmpty(sqlServerConnectionParam.getOther())) { if (MapUtils.isNotEmpty(sqlServerConnectionParam.getOther())) {
return String.format("%s;%s", sqlServerConnectionParam.getJdbcUrl(), sqlServerConnectionParam.getOther()); return String.format("%s;%s", sqlServerConnectionParam.getJdbcUrl(),
transformOther(sqlServerConnectionParam.getOther()));
} }
return sqlServerConnectionParam.getJdbcUrl(); return sqlServerConnectionParam.getJdbcUrl();
} }
@ -135,14 +133,4 @@ public class SQLServerDataSourceProcessor extends AbstractDataSourceProcessor {
return stringBuilder.toString(); return stringBuilder.toString();
} }
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
for (String config : other.split(";")) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
} }

3
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-sqlserver/src/test/java/org/apache/dolphinscheduler/plugin/datasource/sqlserver/param/SQLServerDataSourceProcessorTest.java

@ -81,8 +81,7 @@ public class SQLServerDataSourceProcessorTest {
public void testGetJdbcUrl() { public void testGetJdbcUrl() {
SQLServerConnectionParam sqlServerConnectionParam = new SQLServerConnectionParam(); SQLServerConnectionParam sqlServerConnectionParam = new SQLServerConnectionParam();
sqlServerConnectionParam.setJdbcUrl("jdbc:sqlserver://localhost:1234;databaseName=default"); sqlServerConnectionParam.setJdbcUrl("jdbc:sqlserver://localhost:1234;databaseName=default");
sqlServerConnectionParam.setOther("other"); Assertions.assertEquals("jdbc:sqlserver://localhost:1234;databaseName=default",
Assertions.assertEquals("jdbc:sqlserver://localhost:1234;databaseName=default;other",
sqlServerDatasourceProcessor.getJdbcUrl(sqlServerConnectionParam)); sqlServerDatasourceProcessor.getJdbcUrl(sqlServerConnectionParam));
} }

23
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-trino/src/main/java/org/apache/dolphinscheduler/plugin/datasource/trino/param/TrinoDataSourceProcessor.java

@ -29,13 +29,11 @@ import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType; import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -62,7 +60,7 @@ public class TrinoDataSourceProcessor extends AbstractDataSourceProcessor {
TrinoDatasourceParamDTO.setPort(Integer.valueOf(split[1])); TrinoDatasourceParamDTO.setPort(Integer.valueOf(split[1]));
TrinoDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); TrinoDatasourceParamDTO.setDatabase(connectionParams.getDatabase());
TrinoDatasourceParamDTO.setUserName(connectionParams.getUser()); TrinoDatasourceParamDTO.setUserName(connectionParams.getUser());
TrinoDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); TrinoDatasourceParamDTO.setOther(connectionParams.getOther());
return TrinoDatasourceParamDTO; return TrinoDatasourceParamDTO;
} }
@ -77,13 +75,12 @@ public class TrinoDataSourceProcessor extends AbstractDataSourceProcessor {
TrinoConnectionParam TrinoConnectionParam = new TrinoConnectionParam(); TrinoConnectionParam TrinoConnectionParam = new TrinoConnectionParam();
TrinoConnectionParam.setUser(TrinoParam.getUserName()); TrinoConnectionParam.setUser(TrinoParam.getUserName());
TrinoConnectionParam.setPassword(PasswordUtils.encodePassword(TrinoParam.getPassword())); TrinoConnectionParam.setPassword(PasswordUtils.encodePassword(TrinoParam.getPassword()));
TrinoConnectionParam.setOther(transformOther(TrinoParam.getOther())); TrinoConnectionParam.setOther(TrinoParam.getOther());
TrinoConnectionParam.setAddress(address); TrinoConnectionParam.setAddress(address);
TrinoConnectionParam.setJdbcUrl(jdbcUrl); TrinoConnectionParam.setJdbcUrl(jdbcUrl);
TrinoConnectionParam.setDatabase(TrinoParam.getDatabase()); TrinoConnectionParam.setDatabase(TrinoParam.getDatabase());
TrinoConnectionParam.setDriverClassName(getDatasourceDriver()); TrinoConnectionParam.setDriverClassName(getDatasourceDriver());
TrinoConnectionParam.setValidationQuery(getValidationQuery()); TrinoConnectionParam.setValidationQuery(getValidationQuery());
TrinoConnectionParam.setProps(TrinoParam.getOther());
return TrinoConnectionParam; return TrinoConnectionParam;
} }
@ -106,8 +103,9 @@ public class TrinoDataSourceProcessor extends AbstractDataSourceProcessor {
@Override @Override
public String getJdbcUrl(ConnectionParam connectionParam) { public String getJdbcUrl(ConnectionParam connectionParam) {
TrinoConnectionParam TrinoConnectionParam = (TrinoConnectionParam) connectionParam; TrinoConnectionParam TrinoConnectionParam = (TrinoConnectionParam) connectionParam;
if (StringUtils.isNotEmpty(TrinoConnectionParam.getOther())) { if (MapUtils.isNotEmpty(TrinoConnectionParam.getOther())) {
return String.format("%s?%s", TrinoConnectionParam.getJdbcUrl(), TrinoConnectionParam.getOther()); return String.format("%s?%s", TrinoConnectionParam.getJdbcUrl(),
transformOther(TrinoConnectionParam.getOther()));
} }
return TrinoConnectionParam.getJdbcUrl(); return TrinoConnectionParam.getJdbcUrl();
} }
@ -139,15 +137,4 @@ public class TrinoDataSourceProcessor extends AbstractDataSourceProcessor {
return null; return null;
} }
private Map<String, String> parseOther(String other) {
if (StringUtils.isEmpty(other)) {
return null;
}
Map<String, String> otherMap = new LinkedHashMap<>();
String[] configs = other.split("&");
for (String config : configs) {
otherMap.put(config.split("=")[0], config.split("=")[1]);
}
return otherMap;
}
} }

3
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-trino/src/test/java/org/apache/dolphinscheduler/plugin/datasource/trino/param/TrinoDataSourceProcessorTest.java

@ -77,8 +77,7 @@ public class TrinoDataSourceProcessorTest {
public void testGetJdbcUrl() { public void testGetJdbcUrl() {
TrinoConnectionParam TrinoConnectionParam = new TrinoConnectionParam(); TrinoConnectionParam TrinoConnectionParam = new TrinoConnectionParam();
TrinoConnectionParam.setJdbcUrl("jdbc:postgresql://localhost:8080/default"); TrinoConnectionParam.setJdbcUrl("jdbc:postgresql://localhost:8080/default");
TrinoConnectionParam.setOther("other"); Assertions.assertEquals("jdbc:postgresql://localhost:8080/default",
Assertions.assertEquals("jdbc:postgresql://localhost:8080/default?other",
TrinoDatasourceProcessor.getJdbcUrl(TrinoConnectionParam)); TrinoDatasourceProcessor.getJdbcUrl(TrinoConnectionParam));
} }

87
dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/datasource/BaseConnectionParam.java

@ -17,12 +17,14 @@
package org.apache.dolphinscheduler.spi.datasource; package org.apache.dolphinscheduler.spi.datasource;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonInclude.Include;
@Data
@JsonInclude(Include.NON_NULL) @JsonInclude(Include.NON_NULL)
public abstract class BaseConnectionParam implements ConnectionParam { public abstract class BaseConnectionParam implements ConnectionParam {
@ -42,87 +44,6 @@ public abstract class BaseConnectionParam implements ConnectionParam {
protected String validationQuery; protected String validationQuery;
protected String other; protected Map<String, String> other;
private Map<String, String> props = new HashMap<>();
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getJdbcUrl() {
return jdbcUrl;
}
public void setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
}
public String getDriverLocation() {
return driverLocation;
}
public void setDriverLocation(String driverLocation) {
this.driverLocation = driverLocation;
}
public String getDriverClassName() {
return driverClassName;
}
public void setDriverClassName(String driverClassName) {
this.driverClassName = driverClassName;
}
public String getValidationQuery() {
return validationQuery;
}
public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
}
public String getOther() {
return other;
}
public void setOther(String other) {
this.other = other;
}
public Map<String, String> getProps() {
return props;
}
public void setProps(Map<String, String> props) {
this.props = props;
}
} }

10
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/test/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTaskTest.java

@ -498,7 +498,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\"," + "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\"," + "\"user\":\"test\","
+ "\"password\":\"test\"," + "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}"); + "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\" : \"UTF-8\", \"allowMultiQueries\": true}}");
dataQualityTaskExecutionContext.setStatisticsValueConnectorType("JDBC"); dataQualityTaskExecutionContext.setStatisticsValueConnectorType("JDBC");
dataQualityTaskExecutionContext.setStatisticsValueType(1); dataQualityTaskExecutionContext.setStatisticsValueType(1);
@ -509,7 +509,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\"," + "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\"," + "\"user\":\"test\","
+ "\"password\":\"test\"," + "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}"); + "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\" : \"UTF-8\", \"allowMultiQueries\": true}}");
dataQualityTaskExecutionContext.setCompareWithFixedValue(true); dataQualityTaskExecutionContext.setCompareWithFixedValue(true);
@ -796,7 +796,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\"," + "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\"," + "\"user\":\"test\","
+ "\"password\":\"test\"," + "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}"); + "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\": \"UTF-8\", \"allowMultiQueries\": true}}");
String expect = "{\"name\":\"跨表值比对\",\"env\":{\"type\":\"batch\",\"config\":null},\"readers\"" String expect = "{\"name\":\"跨表值比对\",\"env\":{\"type\":\"batch\",\"config\":null},\"readers\""
+ ":[{\"type\":\"JDBC\",\"config\":{\"database\":\"test\",\"password\":\"test\",\"driver\":" + ":[{\"type\":\"JDBC\",\"config\":{\"database\":\"test\",\"password\":\"test\",\"driver\":"
@ -1122,7 +1122,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\"," + "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\"," + "\"user\":\"test\","
+ "\"password\":\"test\"," + "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}"); + "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\": \"UTF-8\", \"allowMultiQueries\": true}}");
dataQualityTaskExecutionContext.setStatisticsValueConnectorType("JDBC"); dataQualityTaskExecutionContext.setStatisticsValueConnectorType("JDBC");
dataQualityTaskExecutionContext.setStatisticsValueType(1); dataQualityTaskExecutionContext.setStatisticsValueType(1);
@ -1133,7 +1133,7 @@ public class DataQualityTaskTest {
+ "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\"," + "\"jdbcUrl\":\"jdbc:postgresql://localhost:5432/dolphinscheduler\","
+ "\"user\":\"test\"," + "\"user\":\"test\","
+ "\"password\":\"test\"," + "\"password\":\"test\","
+ "\"other\":\"stringtype=unspecified&characterEncoding=UTF-8&allowMultiQueries=true\"}"); + "\"other\":{\"stringtype\": \"unspecified\", \"characterEncoding\": \"UTF-8\", \"allowMultiQueries\": true }}");
dataQualityTaskExecutionContext.setRuleName("跨表准确性"); dataQualityTaskExecutionContext.setRuleName("跨表准确性");
dataQualityTaskExecutionContext.setRuleType(RuleType.MULTI_TABLE_ACCURACY.getCode()); dataQualityTaskExecutionContext.setRuleType(RuleType.MULTI_TABLE_ACCURACY.getCode());

Loading…
Cancel
Save