ronyang1985
3 years ago
committed by
GitHub
21 changed files with 522 additions and 5 deletions
@ -0,0 +1,37 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift; |
||||
|
||||
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; |
||||
|
||||
public class RedshiftConnectionParam extends BaseConnectionParam { |
||||
@Override |
||||
public String toString() { |
||||
return "RedshiftConnectionParam{" |
||||
+ "user='" + user + '\'' |
||||
+ ", password='" + password + '\'' |
||||
+ ", address='" + address + '\'' |
||||
+ ", database='" + database + '\'' |
||||
+ ", jdbcUrl='" + jdbcUrl + '\'' |
||||
+ ", driverLocation='" + driverLocation + '\'' |
||||
+ ", driverClassName='" + driverClassName + '\'' |
||||
+ ", validationQuery='" + validationQuery + '\'' |
||||
+ ", other='" + other + '\'' |
||||
+ '}'; |
||||
} |
||||
} |
@ -0,0 +1,43 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
public class RedshiftDataSourceParamDTO extends BaseDataSourceParamDTO { |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return "RedshiftDataSourceParamDTO{" |
||||
+ "name='" + name + '\'' |
||||
+ ", note='" + note + '\'' |
||||
+ ", host='" + host + '\'' |
||||
+ ", port=" + port |
||||
+ ", database='" + database + '\'' |
||||
+ ", userName='" + userName + '\'' |
||||
+ ", password='" + password + '\'' |
||||
+ ", other='" + other + '\'' |
||||
+ '}'; |
||||
} |
||||
|
||||
@Override |
||||
public DbType getType() { |
||||
return DbType.REDSHIFT; |
||||
} |
||||
} |
@ -0,0 +1,140 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.AbstractDataSourceProcessor; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; |
||||
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; |
||||
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
import org.apache.dolphinscheduler.spi.utils.Constants; |
||||
import org.apache.dolphinscheduler.spi.utils.JSONUtils; |
||||
|
||||
import org.apache.commons.collections4.MapUtils; |
||||
import org.apache.commons.lang.StringUtils; |
||||
|
||||
import java.sql.Connection; |
||||
import java.sql.DriverManager; |
||||
import java.sql.SQLException; |
||||
import java.util.ArrayList; |
||||
import java.util.LinkedHashMap; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
public class RedshiftDataSourceProcessor extends AbstractDataSourceProcessor { |
||||
|
||||
@Override |
||||
public BaseDataSourceParamDTO createDatasourceParamDTO(String connectionJson) { |
||||
RedshiftConnectionParam |
||||
connectionParams = (RedshiftConnectionParam) createConnectionParams(connectionJson); |
||||
|
||||
String[] hostSeperator = connectionParams.getAddress().split(Constants.DOUBLE_SLASH); |
||||
String[] hostPortArray = hostSeperator[hostSeperator.length - 1].split(Constants.COMMA); |
||||
|
||||
RedshiftDataSourceParamDTO |
||||
redshiftDatasourceParamDTO = new RedshiftDataSourceParamDTO(); |
||||
redshiftDatasourceParamDTO.setPort(Integer.parseInt(hostPortArray[0].split(Constants.COLON)[1])); |
||||
redshiftDatasourceParamDTO.setHost(hostPortArray[0].split(Constants.COLON)[0]); |
||||
redshiftDatasourceParamDTO.setDatabase(connectionParams.getDatabase()); |
||||
redshiftDatasourceParamDTO.setUserName(connectionParams.getUser()); |
||||
redshiftDatasourceParamDTO.setOther(parseOther(connectionParams.getOther())); |
||||
|
||||
return redshiftDatasourceParamDTO; |
||||
} |
||||
|
||||
@Override |
||||
public BaseConnectionParam createConnectionParams(BaseDataSourceParamDTO datasourceParam) { |
||||
RedshiftDataSourceParamDTO redshiftParam = (RedshiftDataSourceParamDTO) datasourceParam; |
||||
String address = String.format("%s%s:%s", Constants.JDBC_REDSHIFT, redshiftParam.getHost(), redshiftParam.getPort()); |
||||
String jdbcUrl = address + Constants.SLASH + redshiftParam.getDatabase(); |
||||
|
||||
RedshiftConnectionParam |
||||
redshiftConnectionParam = new RedshiftConnectionParam(); |
||||
redshiftConnectionParam.setUser(redshiftParam.getUserName()); |
||||
redshiftConnectionParam.setPassword(PasswordUtils.encodePassword(redshiftParam.getPassword())); |
||||
redshiftConnectionParam.setOther(transformOther(redshiftParam.getOther())); |
||||
redshiftConnectionParam.setAddress(address); |
||||
redshiftConnectionParam.setJdbcUrl(jdbcUrl); |
||||
redshiftConnectionParam.setDatabase(redshiftParam.getDatabase()); |
||||
redshiftConnectionParam.setDriverClassName(getDatasourceDriver()); |
||||
redshiftConnectionParam.setValidationQuery(getValidationQuery()); |
||||
redshiftConnectionParam.setProps(redshiftParam.getOther()); |
||||
|
||||
return redshiftConnectionParam; |
||||
} |
||||
|
||||
@Override |
||||
public ConnectionParam createConnectionParams(String connectionJson) { |
||||
return JSONUtils.parseObject(connectionJson, RedshiftConnectionParam.class); |
||||
} |
||||
|
||||
@Override |
||||
public String getDatasourceDriver() { |
||||
return Constants.COM_REDSHIFT_JDBC_DRIVER; |
||||
} |
||||
|
||||
@Override |
||||
public String getValidationQuery() { |
||||
return Constants.REDHIFT_VALIDATION_QUERY; |
||||
} |
||||
|
||||
@Override |
||||
public String getJdbcUrl(ConnectionParam connectionParam) { |
||||
RedshiftConnectionParam |
||||
redshiftConnectionParam = (RedshiftConnectionParam) connectionParam; |
||||
if (!StringUtils.isEmpty(redshiftConnectionParam.getOther())) { |
||||
return String.format("%s?%s", redshiftConnectionParam.getJdbcUrl(), redshiftConnectionParam.getOther()); |
||||
} |
||||
return redshiftConnectionParam.getJdbcUrl(); |
||||
} |
||||
|
||||
@Override |
||||
public Connection getConnection(ConnectionParam connectionParam) throws ClassNotFoundException, SQLException { |
||||
RedshiftConnectionParam redshiftConnectionParam = (RedshiftConnectionParam) connectionParam; |
||||
Class.forName(getDatasourceDriver()); |
||||
return DriverManager.getConnection(getJdbcUrl(connectionParam), |
||||
redshiftConnectionParam.getUser(), PasswordUtils.decodePassword(redshiftConnectionParam.getPassword())); |
||||
} |
||||
|
||||
@Override |
||||
public DbType getDbType() { |
||||
return DbType.REDSHIFT; |
||||
} |
||||
|
||||
private String transformOther(Map<String, String> otherMap) { |
||||
if (MapUtils.isNotEmpty(otherMap)) { |
||||
List<String> list = new ArrayList<>(otherMap.size()); |
||||
otherMap.forEach((key, value) -> list.add(String.format("%s=%s", key, value))); |
||||
return String.join(Constants.SEMICOLON, list); |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
private Map<String, String> parseOther(String other) { |
||||
Map<String, String> otherMap = new LinkedHashMap<>(); |
||||
if (StringUtils.isEmpty(other)) { |
||||
return otherMap; |
||||
} |
||||
String[] configs = other.split(Constants.SEMICOLON); |
||||
for (String config : configs) { |
||||
otherMap.put(config.split(Constants.EQUAL_SIGN)[0], config.split(Constants.EQUAL_SIGN)[1]); |
||||
} |
||||
return otherMap; |
||||
} |
||||
} |
@ -0,0 +1,98 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.api.datasource.redshift; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; |
||||
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
import org.apache.dolphinscheduler.spi.utils.Constants; |
||||
|
||||
import java.sql.DriverManager; |
||||
import java.util.HashMap; |
||||
import java.util.Map; |
||||
|
||||
import org.junit.Assert; |
||||
import org.junit.Test; |
||||
import org.junit.runner.RunWith; |
||||
import org.mockito.Mockito; |
||||
import org.powermock.api.mockito.PowerMockito; |
||||
import org.powermock.core.classloader.annotations.PrepareForTest; |
||||
import org.powermock.modules.junit4.PowerMockRunner; |
||||
|
||||
@RunWith(PowerMockRunner.class) |
||||
@PrepareForTest({Class.class, DriverManager.class, DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class}) |
||||
public class RedshiftDataSourceProcessorTest { |
||||
|
||||
private RedshiftDataSourceProcessor redshiftDatasourceProcessor = new RedshiftDataSourceProcessor(); |
||||
|
||||
@Test |
||||
public void testCreateConnectionParams() { |
||||
Map<String, String> props = new HashMap<>(); |
||||
props.put("serverTimezone", "utc"); |
||||
RedshiftDataSourceParamDTO redshiftDatasourceParamDTO = new RedshiftDataSourceParamDTO(); |
||||
redshiftDatasourceParamDTO.setHost("localhost"); |
||||
redshiftDatasourceParamDTO.setPort(5439); |
||||
redshiftDatasourceParamDTO.setDatabase("dev"); |
||||
redshiftDatasourceParamDTO.setUserName("awsuser"); |
||||
redshiftDatasourceParamDTO.setPassword("123456"); |
||||
redshiftDatasourceParamDTO.setOther(props); |
||||
PowerMockito.mockStatic(PasswordUtils.class); |
||||
PowerMockito.when(PasswordUtils.encodePassword(Mockito.anyString())).thenReturn("test"); |
||||
RedshiftConnectionParam connectionParams = (RedshiftConnectionParam) redshiftDatasourceProcessor |
||||
.createConnectionParams(redshiftDatasourceParamDTO); |
||||
Assert.assertEquals("jdbc:redshift://localhost:5439", connectionParams.getAddress()); |
||||
Assert.assertEquals("jdbc:redshift://localhost:5439/dev", connectionParams.getJdbcUrl()); |
||||
} |
||||
|
||||
@Test |
||||
public void testCreateConnectionParams2() { |
||||
String connectionJson = "{\"user\":\"awsuser\",\"password\":\"123456\",\"address\":\"jdbc:redshift://localhost:5439\"" |
||||
+ ",\"database\":\"dev\",\"jdbcUrl\":\"jdbc:redshift://localhost:5439/dev\"}"; |
||||
RedshiftConnectionParam connectionParams = (RedshiftConnectionParam) redshiftDatasourceProcessor |
||||
.createConnectionParams(connectionJson); |
||||
Assert.assertNotNull(connectionParams); |
||||
Assert.assertEquals("awsuser", connectionParams.getUser()); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetDatasourceDriver() { |
||||
Assert.assertEquals(Constants.COM_REDSHIFT_JDBC_DRIVER, redshiftDatasourceProcessor.getDatasourceDriver()); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetJdbcUrl() { |
||||
RedshiftConnectionParam redshiftConnectionParam = new RedshiftConnectionParam(); |
||||
redshiftConnectionParam.setJdbcUrl("jdbc:redshift://localhost:5439/default"); |
||||
redshiftConnectionParam.setOther("DSILogLevel=6;defaultRowFetchSize=100"); |
||||
Assert.assertEquals("jdbc:redshift://localhost:5439/default?DSILogLevel=6;defaultRowFetchSize=100", |
||||
redshiftDatasourceProcessor.getJdbcUrl(redshiftConnectionParam)); |
||||
|
||||
} |
||||
|
||||
@Test |
||||
public void testGetDbType() { |
||||
Assert.assertEquals(DbType.REDSHIFT, redshiftDatasourceProcessor.getDbType()); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetValidationQuery() { |
||||
Assert.assertEquals(Constants.REDHIFT_VALIDATION_QUERY, redshiftDatasourceProcessor.getValidationQuery()); |
||||
} |
||||
} |
@ -0,0 +1,44 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" |
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<parent> |
||||
<artifactId>dolphinscheduler-datasource-plugin</artifactId> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<version>2.0.4-SNAPSHOT</version> |
||||
</parent> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
|
||||
<artifactId>dolphinscheduler-datasource-redshift</artifactId> |
||||
<packaging>jar</packaging> |
||||
<dependencies> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-spi</artifactId> |
||||
<scope>provided</scope> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-datasource-api</artifactId> |
||||
</dependency> |
||||
|
||||
</dependencies> |
||||
</project> |
@ -0,0 +1,30 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.redshift; |
||||
|
||||
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; |
||||
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; |
||||
import org.apache.dolphinscheduler.spi.datasource.DataSourceClient; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
public class RedshiftDataSourceChannel implements DataSourceChannel { |
||||
@Override |
||||
public DataSourceClient createDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { |
||||
return new RedshiftDataSourceClient(baseConnectionParam,dbType); |
||||
} |
||||
} |
@ -0,0 +1,36 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.redshift; |
||||
|
||||
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel; |
||||
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(DataSourceChannelFactory.class) |
||||
public class RedshiftDataSourceChannelFactory implements DataSourceChannelFactory { |
||||
@Override |
||||
public DataSourceChannel create() { |
||||
return new RedshiftDataSourceChannel(); |
||||
} |
||||
|
||||
@Override |
||||
public String getName() { |
||||
return "redshift"; |
||||
} |
||||
} |
@ -0,0 +1,28 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.datasource.redshift; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.datasource.api.client.CommonDataSourceClient; |
||||
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; |
||||
import org.apache.dolphinscheduler.spi.enums.DbType; |
||||
|
||||
public class RedshiftDataSourceClient extends CommonDataSourceClient { |
||||
public RedshiftDataSourceClient(BaseConnectionParam baseConnectionParam, DbType dbType) { |
||||
super(baseConnectionParam, dbType); |
||||
} |
||||
} |
Loading…
Reference in new issue