Browse Source

Refactor BaseDataSource and unified handling of JDBC resource releases. (#2184)

* Refactor BaseDataSource.isConnectable().

* Add BaseDataSource test.

* Unified handling of JDBC resource releases.

* Fix sonar check error.

* Remove useless logger.

* Refactor BaseDataSource.getJdbcUrl().

* Modify ConnectionUtils.releaseResource().

* Add javadoc

* Remove useless method.

* Add BaseDataSourceTest UT
pull/2/head
t1mon 5 years ago committed by GitHub
parent
commit
efde8fd4e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 103
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java
  2. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/MonitorDBDao.java
  3. 35
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
  4. 109
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java
  5. 47
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ClickHouseDataSource.java
  6. 46
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/DB2ServerDataSource.java
  7. 51
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java
  8. 44
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java
  9. 47
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/OracleDataSource.java
  10. 49
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/PostgreDataSource.java
  11. 61
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SQLServerDataSource.java
  12. 52
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SparkDataSource.java
  13. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/MysqlUpgradeDao.java
  14. 4
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/PostgresqlUpgradeDao.java
  15. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java
  16. 115
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSourceTest.java
  17. 22
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java
  18. 1
      pom.xml

103
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ConnectionUtils.java

@ -16,86 +16,35 @@
*/
package org.apache.dolphinscheduler.common.utils;
import java.util.Arrays;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
public class ConnectionUtils {
public static final Logger logger = LoggerFactory.getLogger(ConnectionUtils.class);
private static ConnectionUtils instance;
ConnectionUtils() {
}
public static ConnectionUtils getInstance() {
if (null == instance) {
syncInit();
}
return instance;
}
private static synchronized void syncInit() {
if (instance == null) {
instance = new ConnectionUtils();
}
}
public void release(ResultSet rs, Statement stmt, Connection conn) {
try {
if (rs != null) {
rs.close();
rs = null;
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
} finally {
try {
if (stmt != null) {
stmt.close();
stmt = null;
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
} finally {
try {
if (conn != null) {
conn.close();
conn = null;
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
}
}
public static void releaseResource(ResultSet rs, PreparedStatement ps, Connection conn) {
ConnectionUtils.getInstance().release(rs,ps,conn);
if (null != rs) {
try {
rs.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
if (null != ps) {
try {
ps.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
if (null != conn) {
try {
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
}
public static final Logger logger = LoggerFactory.getLogger(ConnectionUtils.class);
private ConnectionUtils() {
throw new IllegalStateException("ConnectionUtils class");
}
/**
* release resource
* @param resources resources
*/
public static void releaseResource(AutoCloseable... resources) {
if (resources == null || resources.length == 0) {
return;
}
Arrays.stream(resources).filter(Objects::nonNull)
.forEach(resource -> {
try {
resource.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
});
}
}

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/MonitorDBDao.java

@ -18,10 +18,10 @@ package org.apache.dolphinscheduler.dao;
import com.alibaba.druid.pool.DruidDataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.dao.entity.MonitorRecord;
import org.apache.dolphinscheduler.dao.utils.MysqlPerformance;
import org.apache.dolphinscheduler.dao.utils.PostgrePerformance;
@ -63,13 +63,7 @@ public class MonitorDBDao {
}catch (Exception e) {
logger.error("SQLException: {}", e.getMessage(), e);
}finally {
try {
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
logger.error("SQLException ", e);
}
ConnectionUtils.releaseResource(conn);
}
return monitorRecord;
}

35
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.dao;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.TaskRecordStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.ConnectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskRecord;
@ -163,14 +164,14 @@ public class TaskRecordDao {
sql += getWhereString(filterMap);
pstmt = conn.prepareStatement(sql);
rs = pstmt.executeQuery();
while (rs.next()) {
while (rs.next()){
count = rs.getInt("count");
break;
}
} catch (SQLException e) {
logger.error("Exception ", e);
} finally {
closeResource(rs, pstmt, conn);
}finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return count;
}
@ -254,8 +255,8 @@ public class TaskRecordDao {
}
} catch (SQLException e) {
logger.error("Exception ", e);
} finally {
closeResource(rs, pstmt, conn);
}finally {
ConnectionUtils.releaseResource(rs, pstmt, conn);
}
return recordList;
}
@ -292,28 +293,4 @@ public class TaskRecordDao {
}
}
private static void closeResource(ResultSet rs, PreparedStatement pstmt, Connection conn) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
logger.error("Exception ", e);
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
logger.error("Exception ", e);
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("Exception ", e);
}
}
}
}

109
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java

@ -16,10 +16,21 @@
*/
package org.apache.dolphinscheduler.dao.datasource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* data source base class
*/
public abstract class BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(BaseDataSource.class);
/**
* user name
*/
@ -57,17 +68,105 @@ public abstract class BaseDataSource {
public void setPrincipal(String principal) {
this.principal = principal;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
* @return driver class
*/
public abstract String driverClassSelector();
/**
* @return db type
*/
public abstract void isConnectable() throws Exception;
public abstract DbType dbTypeSelector();
/**
* gets the JDBC url for the data source connection
* @return
*/
public abstract String getJdbcUrl();
public String getJdbcUrl() {
StringBuilder jdbcUrl = new StringBuilder(getAddress());
appendDatabase(jdbcUrl);
appendPrincipal(jdbcUrl);
appendOther(jdbcUrl);
return jdbcUrl.toString();
}
/**
* append database
* @param jdbcUrl jdbc url
*/
private void appendDatabase(StringBuilder jdbcUrl) {
if (dbTypeSelector() == DbType.SQLSERVER) {
jdbcUrl.append(";databaseName=").append(getDatabase());
} else {
if (getAddress().lastIndexOf('/') != (jdbcUrl.length() - 1)) {
jdbcUrl.append("/");
}
jdbcUrl.append(getDatabase());
}
}
/**
* append principal
* @param jdbcUrl jdbc url
*/
private void appendPrincipal(StringBuilder jdbcUrl) {
boolean tag = dbTypeSelector() == DbType.HIVE || dbTypeSelector() == DbType.SPARK;
if (tag && StringUtils.isNotEmpty(getPrincipal())) {
jdbcUrl.append(";principal=").append(getPrincipal());
}
}
/**
* append other
* @param jdbcUrl jdbc url
*/
private void appendOther(StringBuilder jdbcUrl) {
if (StringUtils.isNotEmpty(getOther())) {
String separator = "";
switch (dbTypeSelector()) {
case CLICKHOUSE:
case MYSQL:
case ORACLE:
case POSTGRESQL:
separator = "?";
break;
case DB2:
separator = ":";
break;
case HIVE:
case SPARK:
case SQLSERVER:
separator = ";";
break;
default:
logger.error("Db type mismatch!");
}
jdbcUrl.append(separator).append(getOther());
}
}
/**
* test whether the data source can be connected successfully
*/
public void isConnectable() {
Connection con = null;
try {
Class.forName(driverClassSelector());
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} catch (ClassNotFoundException | SQLException e) {
logger.error("Get connection error: {}", e.getMessage());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
}
}
}
public String getUser() {
return user;

47
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ClickHouseDataSource.java

@ -17,59 +17,26 @@
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType;
/**
* data source of ClickHouse
*/
public class ClickHouseDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(ClickHouseDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
* @return driver class
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf('/') != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += "?" + getOther();
}
return jdbcUrl;
public String driverClassSelector() {
return Constants.COM_CLICKHOUSE_JDBC_DRIVER;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
* @return db type
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("ClickHouse datasource try conn close conn error", e);
}
}
}
public DbType dbTypeSelector() {
return DbType.CLICKHOUSE;
}
}

46
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/DB2ServerDataSource.java

@ -17,58 +17,26 @@
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType;
/**
* data source of DB2 Server
*/
public class DB2ServerDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(DB2ServerDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
* @return driver class
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += ":" + getOther();
}
return jdbcUrl;
public String driverClassSelector() {
return Constants.COM_DB2_JDBC_DRIVER;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
* @return db type
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName(Constants.COM_DB2_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("DB2 Server datasource try conn close conn error", e);
}
}
}
public DbType dbTypeSelector() {
return DbType.DB2;
}
}

51
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java

@ -17,63 +17,26 @@
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType;
/**
* data source of hive
*/
public class HiveDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(HiveDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
* @return driver class
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf('/') != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getPrincipal())){
jdbcUrl += ";principal=" + getPrincipal();
}
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += ";" + getOther();
}
return jdbcUrl;
public String driverClassSelector() {
return Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
* @return db type
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), "");
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("hive datasource try conn close conn error", e);
}
}
}
public DbType dbTypeSelector() {
return DbType.HIVE;
}
}

44
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java

@ -17,57 +17,27 @@
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType;
/**
* data source of mySQL
*/
public class MySQLDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(MySQLDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
* @return driver class
*/
@Override
public String getJdbcUrl() {
String address = getAddress();
if (address.lastIndexOf('/') != (address.length() - 1)) {
address += "/";
}
String jdbcUrl = address + getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += "?" + getOther();
}
return jdbcUrl;
public String driverClassSelector() {
return Constants.COM_MYSQL_JDBC_DRIVER;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
* @return db type
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName(Constants.COM_MYSQL_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("Mysql datasource try conn close conn error", e);
}
}
}
public DbType dbTypeSelector() {
return DbType.MYSQL;
}
}

47
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/OracleDataSource.java

@ -17,59 +17,26 @@
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType;
/**
* data source of Oracle
*/
public class OracleDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(OracleDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
* @return driver class
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += "?" + getOther();
}
return jdbcUrl;
public String driverClassSelector() {
return Constants.COM_ORACLE_JDBC_DRIVER;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
* @return db type
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName(Constants.COM_ORACLE_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("Oracle datasource try conn close conn error", e);
}
}
}
public DbType dbTypeSelector() {
return DbType.ORACLE;
}
}

49
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/PostgreDataSource.java

@ -17,61 +17,26 @@
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType;
/**
* data source of postgreSQL
*/
public class PostgreDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(PostgreDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
* @return driver class
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf('/') != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += "?" + getOther();
}
return jdbcUrl;
public String driverClassSelector() {
return Constants.ORG_POSTGRESQL_DRIVER;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
* @return db type
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName(Constants.ORG_POSTGRESQL_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("Postgre datasource try conn close conn error", e);
}
}
}
public DbType dbTypeSelector() {
return DbType.POSTGRESQL;
}
}

61
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SQLServerDataSource.java

@ -17,55 +17,26 @@
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType;
/**
* data source of SQL Server
*/
public class SQLServerDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(SQLServerDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
jdbcUrl += ";databaseName=" + getDatabase();
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += ";" + getOther();
}
return jdbcUrl;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("SQL Server datasource try conn close conn error", e);
}
}
}
}
/**
* @return driver class
*/
@Override
public String driverClassSelector() {
return Constants.COM_SQLSERVER_JDBC_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.SQLSERVER;
}
}

52
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SparkDataSource.java

@ -17,64 +17,26 @@
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType;
/**
* data source of spark
*/
public class SparkDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(SparkDataSource.class);
/**
* gets the JDBC url for the data source connection
* @return
* @return driver class
*/
@Override
public String getJdbcUrl() {
String jdbcUrl = getAddress();
if (jdbcUrl.lastIndexOf("/") != (jdbcUrl.length() - 1)) {
jdbcUrl += "/";
}
jdbcUrl += getDatabase();
if (StringUtils.isNotEmpty(getPrincipal())){
jdbcUrl += ";principal=" + getPrincipal();
}
if (StringUtils.isNotEmpty(getOther())) {
jdbcUrl += ";" + getOther();
}
return jdbcUrl;
public String driverClassSelector() {
return Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER;
}
/**
* test whether the data source can be connected successfully
* @throws Exception
* @return db type
*/
@Override
public void isConnectable() throws Exception {
Connection con = null;
try {
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), "");
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("Spark datasource try conn close conn error", e);
}
}
}
public DbType dbTypeSelector() {
return DbType.SPARK;
}
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/MysqlUpgradeDao.java

@ -66,7 +66,7 @@ public class MysqlUpgradeDao extends UpgradeDao {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
} finally {
ConnectionUtils.releaseResource(rs, null, conn);
ConnectionUtils.releaseResource(rs, conn);
}
}
@ -89,7 +89,7 @@ public class MysqlUpgradeDao extends UpgradeDao {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
} finally {
ConnectionUtils.releaseResource(null, null, conn);
ConnectionUtils.releaseResource(conn);
}
}

4
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/PostgresqlUpgradeDao.java

@ -96,7 +96,7 @@ public class PostgresqlUpgradeDao extends UpgradeDao {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
} finally {
ConnectionUtils.releaseResource(rs, null, conn);
ConnectionUtils.releaseResource(rs, conn);
}
}
@ -119,7 +119,7 @@ public class PostgresqlUpgradeDao extends UpgradeDao {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
} finally {
ConnectionUtils.releaseResource(rs, null, conn);
ConnectionUtils.releaseResource(rs, conn);
}

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/upgrade/UpgradeDao.java

@ -79,7 +79,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
logger.error(e.getMessage(),e);
return null;
}finally {
ConnectionUtils.releaseResource(null, null, conn);
ConnectionUtils.releaseResource(conn);
}
}
@ -160,7 +160,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
} finally {
ConnectionUtils.releaseResource(null, null, conn);
ConnectionUtils.releaseResource(conn);
}
@ -193,7 +193,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
} finally {
ConnectionUtils.releaseResource(null, null, conn);
ConnectionUtils.releaseResource(conn);
}
@ -329,7 +329,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
} finally {
ConnectionUtils.releaseResource(null, pstmt, conn);
ConnectionUtils.releaseResource(pstmt, conn);
}
}
@ -372,7 +372,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage(),e);
} finally {
ConnectionUtils.releaseResource(null, pstmt, conn);
ConnectionUtils.releaseResource(pstmt, conn);
}
}
@ -401,7 +401,7 @@ public abstract class UpgradeDao extends AbstractBaseDao {
logger.error(e.getMessage(),e);
throw new RuntimeException("sql: " + upgradeSQL, e);
} finally {
ConnectionUtils.releaseResource(null, pstmt, conn);
ConnectionUtils.releaseResource(pstmt, conn);
}
}

115
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSourceTest.java

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.junit.Assert;
import org.junit.Test;
public class BaseDataSourceTest {
@Test
public void testDriverClassSelector() {
String mysqlDriverClass = new MySQLDataSource().driverClassSelector();
Assert.assertEquals(Constants.COM_MYSQL_JDBC_DRIVER, mysqlDriverClass);
String clickHouseDriverClass = new ClickHouseDataSource().driverClassSelector();
Assert.assertEquals(Constants.COM_CLICKHOUSE_JDBC_DRIVER, clickHouseDriverClass);
String db2ServerDriverClass = new DB2ServerDataSource().driverClassSelector();
Assert.assertEquals(Constants.COM_DB2_JDBC_DRIVER, db2ServerDriverClass);
String oracleDriverClass = new OracleDataSource().driverClassSelector();
Assert.assertEquals(Constants.COM_ORACLE_JDBC_DRIVER, oracleDriverClass);
String postgreDriverClass = new PostgreDataSource().driverClassSelector();
Assert.assertEquals(Constants.ORG_POSTGRESQL_DRIVER, postgreDriverClass);
String sqlServerDriverClass = new SQLServerDataSource().driverClassSelector();
Assert.assertEquals(Constants.COM_SQLSERVER_JDBC_DRIVER, sqlServerDriverClass);
String hiveDriverClass = new HiveDataSource().driverClassSelector();
Assert.assertEquals(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER, hiveDriverClass);
String sparkDriverClass = new SparkDataSource().driverClassSelector();
Assert.assertEquals(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER, sparkDriverClass);
}
@Test
public void testGetJdbcUrl() {
BaseDataSource hiveDataSource = new HiveDataSource();
hiveDataSource.setAddress("jdbc:hive2://127.0.0.1:10000");
hiveDataSource.setDatabase("test");
hiveDataSource.setPassword("123456");
hiveDataSource.setUser("test");
Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test", hiveDataSource.getJdbcUrl());
//set principal
hiveDataSource.setPrincipal("hive/test.com@TEST.COM");
Assert.assertEquals("jdbc:hive2://127.0.0.1:10000/test;principal=hive/test.com@TEST.COM",
hiveDataSource.getJdbcUrl());
//set fake other
hiveDataSource.setOther("charset=UTF-8");
Assert.assertEquals(
"jdbc:hive2://127.0.0.1:10000/test;principal=hive/test.com@TEST.COM;charset=UTF-8",
hiveDataSource.getJdbcUrl());
BaseDataSource clickHouseDataSource = new ClickHouseDataSource();
clickHouseDataSource.setAddress("jdbc:clickhouse://127.0.0.1:8123");
clickHouseDataSource.setDatabase("test");
clickHouseDataSource.setPassword("123456");
clickHouseDataSource.setUser("test");
Assert.assertEquals("jdbc:clickhouse://127.0.0.1:8123/test", clickHouseDataSource.getJdbcUrl());
//set fake principal
clickHouseDataSource.setPrincipal("fake principal");
Assert.assertEquals("jdbc:clickhouse://127.0.0.1:8123/test", clickHouseDataSource.getJdbcUrl());
//set fake other
clickHouseDataSource.setOther("charset=UTF-8");
Assert.assertEquals("jdbc:clickhouse://127.0.0.1:8123/test?charset=UTF-8",
clickHouseDataSource.getJdbcUrl());
BaseDataSource sqlServerDataSource = new SQLServerDataSource();
sqlServerDataSource.setAddress("jdbc:sqlserver://127.0.0.1:1433");
sqlServerDataSource.setDatabase("test");
sqlServerDataSource.setPassword("123456");
sqlServerDataSource.setUser("test");
Assert.assertEquals("jdbc:sqlserver://127.0.0.1:1433;databaseName=test",
sqlServerDataSource.getJdbcUrl());
//set fake principal
sqlServerDataSource.setPrincipal("fake principal");
Assert.assertEquals("jdbc:sqlserver://127.0.0.1:1433;databaseName=test",
sqlServerDataSource.getJdbcUrl());
//set fake other
sqlServerDataSource.setOther("charset=UTF-8");
Assert.assertEquals("jdbc:sqlserver://127.0.0.1:1433;databaseName=test;charset=UTF-8",
sqlServerDataSource.getJdbcUrl());
BaseDataSource db2DataSource = new DB2ServerDataSource();
db2DataSource.setAddress("jdbc:db2://127.0.0.1:50000");
db2DataSource.setDatabase("test");
db2DataSource.setPassword("123456");
db2DataSource.setUser("test");
Assert.assertEquals("jdbc:db2://127.0.0.1:50000/test", db2DataSource.getJdbcUrl());
//set fake principal
db2DataSource.setPrincipal("fake principal");
Assert.assertEquals("jdbc:db2://127.0.0.1:50000/test", db2DataSource.getJdbcUrl());
//set fake other
db2DataSource.setOther("charset=UTF-8");
Assert.assertEquals("jdbc:db2://127.0.0.1:50000/test:charset=UTF-8", db2DataSource.getJdbcUrl());
}
}

22
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java

@ -140,7 +140,6 @@ public class SqlTask extends AbstractTask {
dataSource.getUserId(),
dataSource.getConnectionParams());
Connection con = null;
List<String> createFuncs = null;
try {
// load class
@ -178,18 +177,10 @@ public class SqlTask extends AbstractTask {
}
// execute sql task
con = executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
executeFuncAndSql(mainSqlBinds, preStatementSqlBinds, postStatementSqlBinds, createFuncs);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw e;
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}
}
}
@ -249,9 +240,8 @@ public class SqlTask extends AbstractTask {
* @param preStatementsBinds pre statements binds
* @param postStatementsBinds post statements binds
* @param createFuncs create functions
* @return Connection
*/
public Connection executeFuncAndSql(SqlBinds mainSqlBinds,
public void executeFuncAndSql(SqlBinds mainSqlBinds,
List<SqlBinds> preStatementsBinds,
List<SqlBinds> postStatementsBinds,
List<String> createFuncs){
@ -343,13 +333,9 @@ public class SqlTask extends AbstractTask {
logger.error(e.getMessage(),e);
throw new RuntimeException(e.getMessage());
} finally {
try {
connection.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
ConnectionUtils.releaseResource(connection);
}
return connection;
}
/**

1
pom.xml

@ -741,6 +741,7 @@
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/dao/datasource/BaseDataSourceTest.java</include>
<include>**/alert/utils/MailUtilsTest.java</include>
<include>**/dao/AlertDaoTest.java</include>
</includes>

Loading…
Cancel
Save