Browse Source

Merge remote-tracking branch 'upstream/dev' into json-split

pull/3/MERGE
lenboo 4 years ago
parent
commit
db5c3bb7f8
  1. 63
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
  2. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  3. 192
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
  4. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java
  5. 108
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
  6. 14
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java
  7. 351
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java
  8. 124
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java
  9. 52
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SQLServerDataSource.java
  10. 45
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SparkDataSource.java

63
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java

@ -14,12 +14,20 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.controller; package org.apache.dolphinscheduler.api.controller;
import io.swagger.annotations.Api; import static org.apache.dolphinscheduler.api.enums.Status.AUTHORIZED_DATA_SOURCE;
import io.swagger.annotations.ApiImplicitParam; import static org.apache.dolphinscheduler.api.enums.Status.CONNECTION_TEST_FAILURE;
import io.swagger.annotations.ApiImplicitParams; import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE;
import io.swagger.annotations.ApiOperation; import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE;
import static org.apache.dolphinscheduler.api.enums.Status.QUERY_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.UNAUTHORIZED_DATASOURCE;
import static org.apache.dolphinscheduler.api.enums.Status.UPDATE_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.VERIFY_DATASOURCE_NAME_FAILURE;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.exceptions.ApiException;
import org.apache.dolphinscheduler.api.service.DataSourceService; import org.apache.dolphinscheduler.api.service.DataSourceService;
@ -30,16 +38,26 @@ import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.GetMapping;
import springfox.documentation.annotations.ApiIgnore; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestAttribute;
import java.util.Map; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
import static org.apache.dolphinscheduler.api.enums.Status.*; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import springfox.documentation.annotations.ApiIgnore;
/** /**
* data source controller * data source controller
@ -101,8 +119,7 @@ public class DataSourceController extends BaseController {
logger.info("login user {} create datasource name: {}, note: {}, type: {}, host: {}, port: {}, database : {}, principal: {}, userName : {}, connectType: {}, other: {}", logger.info("login user {} create datasource name: {}, note: {}, type: {}, host: {}, port: {}, database : {}, principal: {}, userName : {}, connectType: {}, other: {}",
loginUser.getUserName(), name, note, type, host, port, database, principal, userName, connectType, other); loginUser.getUserName(), name, note, type, host, port, database, principal, userName, connectType, other);
String parameter = dataSourceService.buildParameter(type, host, port, database, principal, userName, password, connectType, other); String parameter = dataSourceService.buildParameter(type, host, port, database, principal, userName, password, connectType, other);
Map<String, Object> result = dataSourceService.createDataSource(loginUser, name, note, type, parameter); return dataSourceService.createDataSource(loginUser, name, note, type, parameter);
return returnDataList(result);
} }
@ -156,8 +173,7 @@ public class DataSourceController extends BaseController {
logger.info("login user {} updateProcessInstance datasource name: {}, note: {}, type: {}, connectType: {}, other: {}", logger.info("login user {} updateProcessInstance datasource name: {}, note: {}, type: {}, connectType: {}, other: {}",
loginUser.getUserName(), name, note, type, connectType, other); loginUser.getUserName(), name, note, type, connectType, other);
String parameter = dataSourceService.buildParameter(type, host, port, database, principal, userName, password, connectType, other); String parameter = dataSourceService.buildParameter(type, host, port, database, principal, userName, password, connectType, other);
Map<String, Object> dataSource = dataSourceService.updateDataSource(id, loginUser, name, note, type, parameter); return dataSourceService.updateDataSource(id, loginUser, name, note, type, parameter);
return returnDataList(dataSource);
} }
/** /**
@ -281,15 +297,7 @@ public class DataSourceController extends BaseController {
logger.info("login user {}, connect datasource: {}, note: {}, type: {}, connectType: {}, other: {}", logger.info("login user {}, connect datasource: {}, note: {}, type: {}, connectType: {}, other: {}",
loginUser.getUserName(), name, note, type, connectType, other); loginUser.getUserName(), name, note, type, connectType, other);
String parameter = dataSourceService.buildParameter(type, host, port, database, principal, userName, password, connectType, other); String parameter = dataSourceService.buildParameter(type, host, port, database, principal, userName, password, connectType, other);
Boolean isConnection = dataSourceService.checkConnection(type, parameter); return dataSourceService.checkConnection(type, parameter);
Result result = new Result();
if (isConnection) {
putMsg(result, SUCCESS);
} else {
putMsg(result, CONNECT_DATASOURCE_FAILURE);
}
return result;
} }
/** /**
@ -309,16 +317,7 @@ public class DataSourceController extends BaseController {
public Result connectionTest(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, public Result connectionTest(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("id") int id) { @RequestParam("id") int id) {
logger.info("connection test, login user:{}, id:{}", loginUser.getUserName(), id); logger.info("connection test, login user:{}, id:{}", loginUser.getUserName(), id);
return dataSourceService.connectionTest(id);
Boolean isConnection = dataSourceService.connectionTest(id);
Result result = new Result();
if (isConnection) {
putMsg(result, SUCCESS);
} else {
putMsg(result, CONNECTION_TEST_FAILURE);
}
return result;
} }
/** /**

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -198,7 +198,7 @@ public enum Status {
CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"), CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"), FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"), TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
DATASOURCE_TYPE_NOT_EXIST(10167, "data source type not exist", "数据源类型不存在"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),

192
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java

@ -14,12 +14,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
@ -27,27 +24,36 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbConnectType; import org.apache.dolphinscheduler.common.enums.DbConnectType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.datasource.*; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.datasource.OracleDataSource;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.Resource; import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation; import java.sql.Connection;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.sql.Connection; import com.baomidou.mybatisplus.core.metadata.IPage;
import java.sql.DriverManager; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import java.sql.SQLException; import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.*;
import static org.apache.dolphinscheduler.common.utils.PropertyUtils.getString;
/** /**
* datasource service * datasource service
@ -67,11 +73,9 @@ public class DataSourceService extends BaseService {
public static final String USER_NAME = "userName"; public static final String USER_NAME = "userName";
public static final String OTHER = "other"; public static final String OTHER = "other";
@Autowired @Autowired
private DataSourceMapper dataSourceMapper; private DataSourceMapper dataSourceMapper;
@Autowired @Autowired
private DataSourceUserMapper datasourceUserMapper; private DataSourceUserMapper datasourceUserMapper;
@ -85,24 +89,16 @@ public class DataSourceService extends BaseService {
* @param parameter datasource parameters * @param parameter datasource parameters
* @return create result code * @return create result code
*/ */
public Map<String, Object> createDataSource(User loginUser, String name, String desc, DbType type, String parameter) { public Result<Object> createDataSource(User loginUser, String name, String desc, DbType type, String parameter) {
Map<String, Object> result = new HashMap<>(); Result<Object> result = new Result<>();
// check name can use or not // check name can use or not
if (checkName(name)) { if (checkName(name)) {
putMsg(result, Status.DATASOURCE_EXIST); putMsg(result, Status.DATASOURCE_EXIST);
return result; return result;
} }
Boolean isConnection = checkConnection(type, parameter); Result<Object> isConnection = checkConnection(type, parameter);
if (!isConnection) { if (Status.SUCCESS.getCode() != isConnection.getCode()) {
logger.info("connect failed, type:{}, parameter:{}", type, parameter);
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
return result;
}
BaseDataSource datasource = DataSourceFactory.getDatasource(type, parameter);
if (datasource == null) {
putMsg(result, Status.REQUEST_PARAMS_NOT_VALID_ERROR, parameter);
return result; return result;
} }
@ -125,7 +121,6 @@ public class DataSourceService extends BaseService {
return result; return result;
} }
/** /**
* updateProcessInstance datasource * updateProcessInstance datasource
* *
@ -137,9 +132,9 @@ public class DataSourceService extends BaseService {
* @param id data source id * @param id data source id
* @return update result code * @return update result code
*/ */
public Map<String, Object> updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter) { public Result<Object> updateDataSource(int id, User loginUser, String name, String desc, DbType type, String parameter) {
Map<String, Object> result = new HashMap<>(); Result<Object> result = new Result<>();
// determine whether the data source exists // determine whether the data source exists
DataSource dataSource = dataSourceMapper.selectById(id); DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource == null) { if (dataSource == null) {
@ -168,12 +163,11 @@ public class DataSourceService extends BaseService {
// connectionParams json // connectionParams json
String connectionParams = paramObject.toString(); String connectionParams = paramObject.toString();
Boolean isConnection = checkConnection(type, connectionParams); Result<Object> isConnection = checkConnection(type, parameter);
if (!isConnection) { if (Status.SUCCESS.getCode() != isConnection.getCode()) {
logger.info("connect failed, type:{}, parameter:{}", type, parameter);
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
return result; return result;
} }
Date now = new Date(); Date now = new Date();
dataSource.setName(name.trim()); dataSource.setName(name.trim());
@ -192,7 +186,6 @@ public class DataSourceService extends BaseService {
return queryDataSource != null && queryDataSource.size() > 0; return queryDataSource != null && queryDataSource.size() > 0;
} }
/** /**
* updateProcessInstance datasource * updateProcessInstance datasource
* *
@ -360,8 +353,8 @@ public class DataSourceService extends BaseService {
* @param name datasource name * @param name datasource name
* @return true if data datasource not exists, otherwise return false * @return true if data datasource not exists, otherwise return false
*/ */
public Result verifyDataSourceName(String name) { public Result<Object> verifyDataSourceName(String name) {
Result result = new Result(); Result<Object> result = new Result<>();
List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name); List<DataSource> dataSourceList = dataSourceMapper.queryDataSourceByName(name);
if (dataSourceList != null && dataSourceList.size() > 0) { if (dataSourceList != null && dataSourceList.size() > 0) {
logger.error("datasource name:{} has exist, can't create again.", name); logger.error("datasource name:{} has exist, can't create again.", name);
@ -373,77 +366,6 @@ public class DataSourceService extends BaseService {
return result; return result;
} }
/**
* get connection
*
* @param dbType datasource type
* @param parameter parameter
* @return connection for datasource
*/
private Connection getConnection(DbType dbType, String parameter) {
Connection connection = null;
BaseDataSource datasource = null;
try {
switch (dbType) {
case POSTGRESQL:
datasource = JSONUtils.parseObject(parameter, PostgreDataSource.class);
Class.forName(Constants.ORG_POSTGRESQL_DRIVER);
break;
case MYSQL:
datasource = JSONUtils.parseObject(parameter, MySQLDataSource.class);
Class.forName(Constants.COM_MYSQL_JDBC_DRIVER);
break;
case HIVE:
case SPARK:
if (CommonUtils.getKerberosStartupState()) {
System.setProperty(org.apache.dolphinscheduler.common.Constants.JAVA_SECURITY_KRB5_CONF,
getString(org.apache.dolphinscheduler.common.Constants.JAVA_SECURITY_KRB5_CONF_PATH));
Configuration configuration = new Configuration();
configuration.set(org.apache.dolphinscheduler.common.Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(getString(org.apache.dolphinscheduler.common.Constants.LOGIN_USER_KEY_TAB_USERNAME),
getString(org.apache.dolphinscheduler.common.Constants.LOGIN_USER_KEY_TAB_PATH));
}
if (dbType == DbType.HIVE) {
datasource = JSONUtils.parseObject(parameter, HiveDataSource.class);
} else if (dbType == DbType.SPARK) {
datasource = JSONUtils.parseObject(parameter, SparkDataSource.class);
}
Class.forName(Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER);
break;
case CLICKHOUSE:
datasource = JSONUtils.parseObject(parameter, ClickHouseDataSource.class);
Class.forName(Constants.COM_CLICKHOUSE_JDBC_DRIVER);
break;
case ORACLE:
datasource = JSONUtils.parseObject(parameter, OracleDataSource.class);
Class.forName(Constants.COM_ORACLE_JDBC_DRIVER);
break;
case SQLSERVER:
datasource = JSONUtils.parseObject(parameter, SQLServerDataSource.class);
Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER);
break;
case DB2:
datasource = JSONUtils.parseObject(parameter, DB2ServerDataSource.class);
Class.forName(Constants.COM_DB2_JDBC_DRIVER);
break;
case PRESTO:
datasource = JSONUtils.parseObject(parameter, PrestoDataSource.class);
Class.forName(Constants.COM_PRESTO_JDBC_DRIVER);
break;
default:
break;
}
if (datasource != null) {
connection = DriverManager.getConnection(datasource.getJdbcUrl(), datasource.getUser(), datasource.getPassword());
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return connection;
}
/** /**
* check connection * check connection
* *
@ -451,18 +373,24 @@ public class DataSourceService extends BaseService {
* @param parameter data source parameters * @param parameter data source parameters
* @return true if connect successfully, otherwise false * @return true if connect successfully, otherwise false
*/ */
public boolean checkConnection(DbType type, String parameter) { public Result<Object> checkConnection(DbType type, String parameter) {
Boolean isConnection = false; Result<Object> result = new Result<>();
Connection con = getConnection(type, parameter); BaseDataSource datasource = DataSourceFactory.getDatasource(type, parameter);
if (con != null) { if (datasource == null) {
isConnection = true; putMsg(result, Status.DATASOURCE_TYPE_NOT_EXIST, type);
try { return result;
con.close(); }
} catch (SQLException e) { try (Connection connection = datasource.getConnection()) {
logger.error("close connection fail at DataSourceService::checkConnection()", e); if (connection == null) {
putMsg(result, Status.CONNECTION_TEST_FAILURE);
return result;
} }
putMsg(result, Status.SUCCESS);
return result;
} catch (Exception e) {
logger.error("datasource test connection error, dbType:{}, jdbcUrl:{}, message:{}.", type, datasource.getJdbcUrl(), e.getMessage());
return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(),e.getMessage());
} }
return isConnection;
} }
/** /**
@ -471,13 +399,14 @@ public class DataSourceService extends BaseService {
* @param id datasource id * @param id datasource id
* @return connect result code * @return connect result code
*/ */
public boolean connectionTest(int id) { public Result<Object> connectionTest(int id) {
DataSource dataSource = dataSourceMapper.selectById(id); DataSource dataSource = dataSourceMapper.selectById(id);
if (dataSource != null) { if (dataSource == null) {
return checkConnection(dataSource.getType(), dataSource.getConnectionParams()); Result<Object> result = new Result<>();
} else { putMsg(result, Status.RESOURCE_NOT_EXIST);
return false; return result;
} }
return checkConnection(dataSource.getType(), dataSource.getConnectionParams());
} }
/** /**
@ -510,8 +439,8 @@ public class DataSourceService extends BaseService {
parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType); parameterMap.put(Constants.ORACLE_DB_CONNECT_TYPE, connectType);
} }
if (CommonUtils.getKerberosStartupState() && if (CommonUtils.getKerberosStartupState()
(type == DbType.HIVE || type == DbType.SPARK)) { && (type == DbType.HIVE || type == DbType.SPARK)) {
jdbcUrl += ";principal=" + principal; jdbcUrl += ";principal=" + principal;
} }
@ -535,8 +464,8 @@ public class DataSourceService extends BaseService {
parameterMap.put(Constants.JDBC_URL, jdbcUrl); parameterMap.put(Constants.JDBC_URL, jdbcUrl);
parameterMap.put(Constants.USER, userName); parameterMap.put(Constants.USER, userName);
parameterMap.put(Constants.PASSWORD, CommonUtils.encodePassword(password)); parameterMap.put(Constants.PASSWORD, CommonUtils.encodePassword(password));
if (CommonUtils.getKerberosStartupState() && if (CommonUtils.getKerberosStartupState()
(type == DbType.HIVE || type == DbType.SPARK)) { && (type == DbType.HIVE || type == DbType.SPARK)) {
parameterMap.put(Constants.PRINCIPAL, principal); parameterMap.put(Constants.PRINCIPAL, principal);
} }
@ -557,7 +486,6 @@ public class DataSourceService extends BaseService {
} }
return JSONUtils.toJsonString(parameterMap); return JSONUtils.toJsonString(parameterMap);
} }
private String buildAddress(DbType type, String host, String port, DbConnectType connectType) { private String buildAddress(DbType type, String host, String port, DbConnectType connectType) {
@ -609,8 +537,8 @@ public class DataSourceService extends BaseService {
* @return delete result code * @return delete result code
*/ */
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
public Result delete(User loginUser, int datasourceId) { public Result<Object> delete(User loginUser, int datasourceId) {
Result result = new Result(); Result<Object> result = new Result<>();
try { try {
//query datasource by id //query datasource by id
DataSource dataSource = dataSourceMapper.selectById(datasourceId); DataSource dataSource = dataSourceMapper.selectById(datasourceId);
@ -673,7 +601,6 @@ public class DataSourceService extends BaseService {
return result; return result;
} }
/** /**
* authorized datasource * authorized datasource
* *
@ -695,7 +622,6 @@ public class DataSourceService extends BaseService {
return result; return result;
} }
/** /**
* get host and port by address * get host and port by address
* *

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/BaseServiceTest.java

@ -72,6 +72,8 @@ public class BaseServiceTest {
} }
@Test @Test
public void testPutMsg(){ public void testPutMsg(){

108
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.api.service; package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
@ -24,12 +25,19 @@ import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.datasource.MySQLDataSource; import org.apache.dolphinscheduler.dao.datasource.MySQLDataSource;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -38,16 +46,15 @@ import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito; import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@RunWith(PowerMockRunner.class) @RunWith(PowerMockRunner.class)
@PowerMockIgnore({"sun.security.*", "javax.net.*"}) @PowerMockIgnore({"sun.security.*", "javax.net.*"})
@PrepareForTest({DataSourceFactory.class})
public class DataSourceServiceTest { public class DataSourceServiceTest {
@InjectMocks @InjectMocks
private DataSourceService dataSourceService; private DataSourceService dataSourceService;
@Mock @Mock
@ -69,28 +76,31 @@ public class DataSourceServiceTest {
dataSource.setName(dataSourceName); dataSource.setName(dataSourceName);
dataSourceList.add(dataSource); dataSourceList.add(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(dataSourceList); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(dataSourceList);
Map<String, Object> dataSourceExitsResult = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result dataSourceExitsResult = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.DATASOURCE_EXIST, dataSourceExitsResult.get(Constants.STATUS)); Assert.assertEquals(Status.DATASOURCE_EXIST.getCode(), dataSourceExitsResult.getCode().intValue());
// data source exits // data source exits
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(false); Result connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(),Status.DATASOURCE_CONNECT_FAILED.getMsg());
Map<String, Object> connectFailedResult = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); //PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED, connectFailedResult.get(Constants.STATUS)); PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType, parameter);
Result connectFailedResult = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), connectFailedResult.getCode().intValue());
// data source exits // data source exits
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(true); connectionResult = new Result(Status.SUCCESS.getCode(),Status.SUCCESS.getMsg());
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
PowerMockito.when(DataSourceFactory.getDatasource(dataSourceType, parameter)).thenReturn(null); PowerMockito.when(DataSourceFactory.getDatasource(dataSourceType, parameter)).thenReturn(null);
Map<String, Object> notValidError = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result notValidError = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR, notValidError.get(Constants.STATUS)); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), notValidError.getCode().intValue());
// success // success
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(true); PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
PowerMockito.when(DataSourceFactory.getDatasource(dataSourceType, parameter)).thenReturn(JSONUtils.parseObject(parameter, MySQLDataSource.class)); PowerMockito.when(DataSourceFactory.getDatasource(dataSourceType, parameter)).thenReturn(JSONUtils.parseObject(parameter, MySQLDataSource.class));
Map<String, Object> success = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result success = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.SUCCESS, success.get(Constants.STATUS)); Assert.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue());
} }
public void updateDataSourceTest() { public void updateDataSourceTest() {
@ -104,14 +114,14 @@ public class DataSourceServiceTest {
// data source not exits // data source not exits
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null);
Map<String, Object> resourceNotExits = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result resourceNotExits = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.RESOURCE_NOT_EXIST, resourceNotExits.get(Constants.STATUS)); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getCode(), resourceNotExits.getCode().intValue());
// user no operation perm // user no operation perm
DataSource dataSource = new DataSource(); DataSource dataSource = new DataSource();
dataSource.setUserId(0); dataSource.setUserId(0);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
Map<String, Object> userNoOperationPerm = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result userNoOperationPerm = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.USER_NO_OPERATION_PERM, userNoOperationPerm.get(Constants.STATUS)); Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), userNoOperationPerm.getCode().intValue());
// data source name exits // data source name exits
dataSource.setUserId(-1); dataSource.setUserId(-1);
@ -119,22 +129,24 @@ public class DataSourceServiceTest {
dataSourceList.add(dataSource); dataSourceList.add(dataSource);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(dataSourceList); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(dataSourceList);
Map<String, Object> dataSourceNameExist = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result dataSourceNameExist = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.DATASOURCE_EXIST, dataSourceNameExist.get(Constants.STATUS)); Assert.assertEquals(Status.DATASOURCE_EXIST.getCode(), dataSourceNameExist.getCode().intValue());
// data source connect failed // data source connect failed
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(true); Result connectionResult = new Result(Status.SUCCESS.getCode(),Status.SUCCESS.getMsg());
Map<String, Object> connectFailed = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED, connectFailed.get(Constants.STATUS)); Result connectFailed = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), connectFailed.getCode().intValue());
//success //success
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(false); connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(),Status.DATASOURCE_CONNECT_FAILED.getMsg());
Map<String, Object> success = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
Assert.assertEquals(Status.SUCCESS, connectFailed.get(Constants.STATUS)); Result success = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue());
} }
@ -152,7 +164,8 @@ public class DataSourceServiceTest {
public void connectionTest() { public void connectionTest() {
int dataSourceId = -1; int dataSourceId = -1;
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null);
Assert.assertFalse(dataSourceService.connectionTest(dataSourceId)); Result result = dataSourceService.connectionTest(dataSourceId);
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getCode(),result.getCode().intValue());
} }
@Test @Test
@ -252,7 +265,8 @@ public class DataSourceServiceTest {
dataSource.setName("test"); dataSource.setName("test");
dataSource.setNote("Note"); dataSource.setNote("Note");
dataSource.setType(DbType.ORACLE); dataSource.setType(DbType.ORACLE);
dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\",\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}"); dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
+ "\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
return dataSource; return dataSource;
} }
@ -261,7 +275,8 @@ public class DataSourceServiceTest {
public void buildParameter() { public void buildParameter() {
String param = dataSourceService.buildParameter(DbType.ORACLE, "192.168.9.1", "1521", "im" String param = dataSourceService.buildParameter(DbType.ORACLE, "192.168.9.1", "1521", "im"
, "", "test", "test", DbConnectType.ORACLE_SERVICE_NAME, ""); , "", "test", "test", DbConnectType.ORACLE_SERVICE_NAME, "");
String expected = "{\"connectType\":\"ORACLE_SERVICE_NAME\",\"type\":\"ORACLE_SERVICE_NAME\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\",\"user\":\"test\",\"password\":\"test\"}"; String expected = "{\"connectType\":\"ORACLE_SERVICE_NAME\",\"type\":\"ORACLE_SERVICE_NAME\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\","
+ "\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\",\"user\":\"test\",\"password\":\"test\"}";
Assert.assertEquals(expected, param); Assert.assertEquals(expected, param);
} }
@ -270,10 +285,10 @@ public class DataSourceServiceTest {
PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "true"); PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "true");
String param = dataSourceService.buildParameter(DbType.MYSQL, "192.168.9.1", "1521", "im" String param = dataSourceService.buildParameter(DbType.MYSQL, "192.168.9.1", "1521", "im"
, "", "test", "123456", null, ""); , "", "test", "123456", null, "");
String expected = "{\"type\":null,\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/im\",\"user\":\"test\",\"password\":\"IUAjJCVeJipNVEl6TkRVMg==\"}"; String expected = "{\"type\":null,\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/im\","
+ "\"user\":\"test\",\"password\":\"IUAjJCVeJipNVEl6TkRVMg==\"}";
Assert.assertEquals(expected, param); Assert.assertEquals(expected, param);
PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "false"); PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "false");
param = dataSourceService.buildParameter(DbType.MYSQL, "192.168.9.1", "1521", "im" param = dataSourceService.buildParameter(DbType.MYSQL, "192.168.9.1", "1521", "im"
, "", "test", "123456", null, ""); , "", "test", "123456", null, "");
@ -294,4 +309,31 @@ public class DataSourceServiceTest {
return loginUser; return loginUser;
} }
} /**
* test check connection
* @throws Exception
*/
@Test
public void testCheckConnection() throws Exception {
DbType dataSourceType = DbType.POSTGRESQL;
String parameter = dataSourceService.buildParameter(dataSourceType, "172.16.133.200", "5432", "dolphinscheduler", null, "postgres", "", null, null);
PowerMockito.mockStatic(DataSourceFactory.class);
PowerMockito.when(DataSourceFactory.getDatasource(Mockito.any(), Mockito.anyString())).thenReturn(null);
Result result = dataSourceService.checkConnection(dataSourceType, parameter);
Assert.assertEquals(Status.DATASOURCE_TYPE_NOT_EXIST.getCode(), result.getCode().intValue());
BaseDataSource dataSource = PowerMockito.mock(BaseDataSource.class);
PowerMockito.when(DataSourceFactory.getDatasource(Mockito.any(), Mockito.anyString())).thenReturn(dataSource);
PowerMockito.when(dataSource.getConnection()).thenReturn(null);
result = dataSourceService.checkConnection(dataSourceType, parameter);
Assert.assertEquals(Status.CONNECTION_TEST_FAILURE.getCode(), result.getCode().intValue());
Connection connection = PowerMockito.mock(Connection.class);
PowerMockito.when(dataSource.getConnection()).thenReturn(connection);
result = dataSourceService.checkConnection(dataSourceType, parameter);
Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());
}
}

14
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/StringUtils.java

@ -33,11 +33,17 @@ public class StringUtils {
return !isEmpty(cs); return !isEmpty(cs);
} }
public static boolean isBlank(String s) { public static boolean isBlank(String str) {
if (isEmpty(s)) { int strLen;
return true; if (str != null && (strLen = str.length()) != 0) {
for (int i = 0; i < strLen; ++i) {
if (!Character.isWhitespace(str.charAt(i))) {
return false;
}
}
} }
return s.trim().length() == 0; return true;
} }
public static boolean isNotBlank(String s) { public static boolean isNotBlank(String s) {

351
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/BaseDataSource.java

@ -14,14 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.datasource; package org.apache.dolphinscheduler.dao.datasource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.sql.Connection;
import java.sql.DriverManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -30,194 +32,183 @@ import org.slf4j.LoggerFactory;
*/ */
public abstract class BaseDataSource { public abstract class BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(BaseDataSource.class); private static final Logger logger = LoggerFactory.getLogger(BaseDataSource.class);
/** /**
* user name * user name
*/ */
protected String user; protected String user;
/** /**
* user password * user password
*/ */
protected String password; protected String password;
/** /**
* data source address * data source address
*/ */
private String address; private String address;
/** /**
* database name * database name
*/ */
private String database; private String database;
/** /**
* other connection parameters for the data source * other connection parameters for the data source
*/ */
private String other; private String other;
/** /**
* principal * principal
*/ */
private String principal; private String principal;
public String getPrincipal() { public String getPrincipal() {
return principal; return principal;
} }
public void setPrincipal(String principal) { public void setPrincipal(String principal) {
this.principal = principal; this.principal = principal;
} }
/** /**
* @return driver class * @return driver class
*/ */
public abstract String driverClassSelector(); public abstract String driverClassSelector();
/** /**
* @return db type * @return db type
*/ */
public abstract DbType dbTypeSelector(); public abstract DbType dbTypeSelector();
/** /**
* gets the JDBC url for the data source connection * gets the JDBC url for the data source connection
* @return getJdbcUrl * @return getJdbcUrl
*/ */
public String getJdbcUrl() { public String getJdbcUrl() {
StringBuilder jdbcUrl = new StringBuilder(getAddress()); StringBuilder jdbcUrl = new StringBuilder(getAddress());
appendDatabase(jdbcUrl); appendDatabase(jdbcUrl);
appendPrincipal(jdbcUrl); appendPrincipal(jdbcUrl);
appendOther(jdbcUrl); appendOther(jdbcUrl);
return jdbcUrl.toString(); return jdbcUrl.toString();
} }
/** /**
* append database * append database
* @param jdbcUrl jdbc url * @param jdbcUrl jdbc url
*/ */
protected void appendDatabase(StringBuilder jdbcUrl) { protected void appendDatabase(StringBuilder jdbcUrl) {
if (dbTypeSelector() == DbType.SQLSERVER) { if (dbTypeSelector() == DbType.SQLSERVER) {
jdbcUrl.append(";databaseName=").append(getDatabase()); jdbcUrl.append(";databaseName=").append(getDatabase());
} else { } else {
if (getAddress().lastIndexOf('/') != (jdbcUrl.length() - 1)) { if (getAddress().lastIndexOf('/') != (jdbcUrl.length() - 1)) {
jdbcUrl.append("/"); jdbcUrl.append("/");
} }
jdbcUrl.append(getDatabase()); jdbcUrl.append(getDatabase());
} }
} }
/** /**
* append principal * append principal
* @param jdbcUrl jdbc url * @param jdbcUrl jdbc url
*/ */
private void appendPrincipal(StringBuilder jdbcUrl) { private void appendPrincipal(StringBuilder jdbcUrl) {
boolean tag = dbTypeSelector() == DbType.HIVE || dbTypeSelector() == DbType.SPARK; boolean tag = dbTypeSelector() == DbType.HIVE || dbTypeSelector() == DbType.SPARK;
if (tag && StringUtils.isNotEmpty(getPrincipal())) { if (tag && StringUtils.isNotEmpty(getPrincipal())) {
jdbcUrl.append(";principal=").append(getPrincipal()); jdbcUrl.append(";principal=").append(getPrincipal());
}
}
/**
* append other
* @param jdbcUrl jdbc url
*/
private void appendOther(StringBuilder jdbcUrl) {
String otherParams = filterOther(getOther());
if (StringUtils.isNotEmpty(otherParams)) {
String separator = "";
switch (dbTypeSelector()) {
case CLICKHOUSE:
case MYSQL:
case ORACLE:
case POSTGRESQL:
case PRESTO:
separator = "?";
break;
case DB2:
separator = ":";
break;
case HIVE:
case SPARK:
case SQLSERVER:
separator = ";";
break;
default:
logger.error("Db type mismatch!");
}
jdbcUrl.append(separator).append(otherParams);
}
}
protected String filterOther(String otherParams){
return otherParams;
}
/**
* test whether the data source can be connected successfully
*/
public void isConnectable() {
Connection con = null;
try {
Class.forName(driverClassSelector());
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} catch (ClassNotFoundException | SQLException e) {
logger.error("Get connection error: {}", e.getMessage());
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error(e.getMessage(), e);
} }
}
} }
}
public String getUser() { /**
return user; * append other
} * @param jdbcUrl jdbc url
*/
private void appendOther(StringBuilder jdbcUrl) {
String otherParams = filterOther(getOther());
if (StringUtils.isNotEmpty(otherParams)) {
String separator = "";
switch (dbTypeSelector()) {
case CLICKHOUSE:
case MYSQL:
case ORACLE:
case POSTGRESQL:
case PRESTO:
separator = "?";
break;
case DB2:
separator = ":";
break;
case HIVE:
case SPARK:
case SQLSERVER:
separator = ";";
break;
default:
logger.error("Db type mismatch!");
}
jdbcUrl.append(separator).append(otherParams);
}
}
/**
* the data source test connection
* @return Connection Connection
* @throws Exception Exception
*/
public Connection getConnection() throws Exception {
Class.forName(driverClassSelector());
return DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
}
protected String filterOther(String otherParams) {
return otherParams;
}
public String getUser() {
return user;
}
public void setUser(String user) { public void setUser(String user) {
this.user = user; this.user = user;
} }
/** /**
* password need decode * password need decode
* @return * @return
*/ */
public String getPassword() { public String getPassword() {
return CommonUtils.decodePassword(password); return CommonUtils.decodePassword(password);
} }
public void setPassword(String password) { public void setPassword(String password) {
this.password = password; this.password = password;
} }
public void setAddress(String address) { public void setAddress(String address) {
this.address = address; this.address = address;
} }
public String getAddress() { public String getAddress() {
return address; return address;
} }
public String getDatabase() { public String getDatabase() {
return database; return database;
} }
public void setDatabase(String database) { public void setDatabase(String database) {
this.database = database; this.database = database;
} }
public String getOther() { public String getOther() {
return other; return other;
} }
public void setOther(String other) { public void setOther(String other) {
this.other = other; this.other = other;
} }
} }

124
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/HiveDataSource.java

@ -19,78 +19,92 @@ package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import java.sql.Connection;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
/** /**
* data source of hive * data source of hive
*/ */
public class HiveDataSource extends BaseDataSource { public class HiveDataSource extends BaseDataSource {
/** /**
* gets the JDBC url for the data source connection * gets the JDBC url for the data source connection
* @return jdbc url * @return jdbc url
*/ */
@Override @Override
public String driverClassSelector() { public String driverClassSelector() {
return Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER; return Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER;
}
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.HIVE;
}
/**
* build hive jdbc params,append : ?hive_conf_list
*
* hive jdbc url template:
*
* jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list
*
* @param otherParams otherParams
* @return filter otherParams
*/
@Override
protected String filterOther(String otherParams) {
if (StringUtils.isBlank(otherParams)) {
return "";
} }
StringBuilder hiveConfListSb = new StringBuilder(); /**
hiveConfListSb.append("?"); * @return db type
StringBuilder sessionVarListSb = new StringBuilder(); */
@Override
public DbType dbTypeSelector() {
return DbType.HIVE;
}
String[] otherArray = otherParams.split(";", -1); /**
* build hive jdbc params,append : ?hive_conf_list
*
* hive jdbc url template:
*
* jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list
*
* @param otherParams otherParams
* @return filter otherParams
*/
@Override
protected String filterOther(String otherParams) {
if (StringUtils.isBlank(otherParams)) {
return "";
}
// get the default hive conf var name StringBuilder hiveConfListSb = new StringBuilder();
Set<String> hiveConfSet = Stream.of(ConfVars.values()).map(confVars -> confVars.varname) hiveConfListSb.append("?");
.collect(Collectors.toSet()); StringBuilder sessionVarListSb = new StringBuilder();
for (String conf : otherArray) { String[] otherArray = otherParams.split(";", -1);
if (hiveConfSet.contains(conf.split("=")[0])) {
hiveConfListSb.append(conf).append(";");
} else {
sessionVarListSb.append(conf).append(";");
}
}
// remove the last ";" // get the default hive conf var name
if (sessionVarListSb.length() > 0) { Set<String> hiveConfSet = Stream.of(ConfVars.values()).map(confVars -> confVars.varname)
sessionVarListSb.deleteCharAt(sessionVarListSb.length() - 1); .collect(Collectors.toSet());
}
for (String conf : otherArray) {
if (hiveConfSet.contains(conf.split("=")[0])) {
hiveConfListSb.append(conf).append(";");
} else {
sessionVarListSb.append(conf).append(";");
}
}
// remove the last ";"
if (sessionVarListSb.length() > 0) {
sessionVarListSb.deleteCharAt(sessionVarListSb.length() - 1);
}
if (hiveConfListSb.length() > 0) { if (hiveConfListSb.length() > 0) {
hiveConfListSb.deleteCharAt(hiveConfListSb.length() - 1); hiveConfListSb.deleteCharAt(hiveConfListSb.length() - 1);
}
return sessionVarListSb.toString() + hiveConfListSb.toString();
} }
return sessionVarListSb.toString() + hiveConfListSb.toString(); /**
} * the data source test connection
* @return Connection Connection
* @throws Exception Exception
*/
@Override
public Connection getConnection() throws Exception {
CommonUtils.loadKerberosConf();
return super.getConnection();
}
} }

52
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SQLServerDataSource.java

@ -14,25 +14,18 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.datasource; package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/** /**
* data source of SQL Server * data source of SQL Server
*/ */
public class SQLServerDataSource extends BaseDataSource { public class SQLServerDataSource extends BaseDataSource {
private static final Logger logger = LoggerFactory.getLogger(SQLServerDataSource.class);
/** /**
* gets the JDBC url for the data source connection * gets the JDBC url for the data source connection
* @return jdbc url * @return jdbc url
@ -50,39 +43,18 @@ public class SQLServerDataSource extends BaseDataSource {
} }
/** /**
* test whether the data source can be connected successfully * @return driver class
*/ */
@Override @Override
public void isConnectable() { public String driverClassSelector() {
Connection con = null; return Constants.COM_SQLSERVER_JDBC_DRIVER;
try {
Class.forName(Constants.COM_SQLSERVER_JDBC_DRIVER);
con = DriverManager.getConnection(getJdbcUrl(), getUser(), getPassword());
} catch (Exception e) {
logger.error("error", e);
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
logger.error("SQL Server datasource try conn close conn error", e);
}
}
}
} }
/**
* @return driver class
*/
@Override
public String driverClassSelector() {
return Constants.COM_SQLSERVER_JDBC_DRIVER;
}
/** /**
* @return db type * @return db type
*/ */
@Override @Override
public DbType dbTypeSelector() { public DbType dbTypeSelector() {
return DbType.SQLSERVER; return DbType.SQLSERVER;
} }
} }

45
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SparkDataSource.java

@ -14,30 +14,45 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.datasource; package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import java.sql.Connection;
/** /**
* data source of spark * data source of spark
*/ */
public class SparkDataSource extends BaseDataSource { public class SparkDataSource extends BaseDataSource {
/** /**
* gets the JDBC url for the data source connection * gets the JDBC url for the data source connection
* @return jdbc url * @return jdbc url
*/ */
@Override @Override
public String driverClassSelector() { public String driverClassSelector() {
return Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER; return Constants.ORG_APACHE_HIVE_JDBC_HIVE_DRIVER;
} }
/**
* @return db type
*/
@Override
public DbType dbTypeSelector() {
return DbType.SPARK;
}
/** /**
* @return db type * the data source test connection
*/ * @return Connection Connection
@Override * @throws Exception Exception
public DbType dbTypeSelector() { */
return DbType.SPARK; @Override
} public Connection getConnection() throws Exception {
CommonUtils.loadKerberosConf();
return super.getConnection();
}
} }

Loading…
Cancel
Save