Browse Source

Use adhoc connection in sql/procedure task (#14670)

3.1.8-release
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
8e058fc227
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 57
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
  2. 159
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
  3. 4
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java
  4. 49
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  5. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
  6. 61
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

57
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java

@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
@ -70,8 +69,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* data source service impl
*/
@ -86,7 +83,6 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
@Autowired
private DataSourceUserMapper datasourceUserMapper;
private static final String TABLE = "TABLE";
private static final String VIEW = "VIEW";
private static final String[] TABLE_TYPES = new String[]{TABLE, VIEW};
@ -105,7 +101,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
public Result<Object> createDataSource(User loginUser, BaseDataSourceParamDTO datasourceParam) {
DataSourceUtils.checkDatasourceParam(datasourceParam);
Result<Object> result = new Result<>();
if (!canOperatorPermissions(loginUser,null, AuthorizationType.DATASOURCE, ApiFuncIdentificationConstant.DATASOURCE_CREATE_DATASOURCE)) {
if (!canOperatorPermissions(loginUser, null, AuthorizationType.DATASOURCE,
ApiFuncIdentificationConstant.DATASOURCE_CREATE_DATASOURCE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@ -114,7 +111,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
if(checkDescriptionLength(datasourceParam.getNote())){
if (checkDescriptionLength(datasourceParam.getNote())) {
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
@ -141,7 +138,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
try {
dataSourceMapper.insert(dataSource);
putMsg(result, Status.SUCCESS);
permissionPostHandle(AuthorizationType.DATASOURCE, loginUser.getId(), Collections.singletonList(dataSource.getId()), logger);
permissionPostHandle(AuthorizationType.DATASOURCE, loginUser.getId(),
Collections.singletonList(dataSource.getId()), logger);
} catch (DuplicateKeyException ex) {
logger.error("Create datasource error.", ex);
putMsg(result, Status.DATASOURCE_EXIST);
@ -168,22 +166,24 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
return result;
}
if (!canOperatorPermissions(loginUser,new Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE, DATASOURCE_UPDATE)) {
if (!canOperatorPermissions(loginUser, new Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE,
DATASOURCE_UPDATE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
//check name can use or not
// check name can use or not
if (!dataSource.getName().trim().equals(dataSource.getName()) && checkName(dataSource.getName())) {
putMsg(result, Status.DATASOURCE_EXIST);
return result;
}
if(checkDescriptionLength(dataSourceParam.getNote())){
if (checkDescriptionLength(dataSourceParam.getNote())) {
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
//check password,if the password is not updated, set to the old password.
BaseConnectionParam connectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dataSourceParam);
// check password,if the password is not updated, set to the old password.
BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(dataSourceParam);
String password = connectionParam.getPassword();
if (StringUtils.isBlank(password)) {
String oldConnectionParams = dataSource.getConnectionParams();
@ -262,9 +262,11 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
Page<DataSource> dataSourcePage = new Page<>(pageNo, pageSize);
PageInfo<DataSource> pageInfo = new PageInfo<>(pageNo, pageSize);
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, UserType.ADMIN_USER.equals(loginUser.getUserType()) ? 0 : loginUser.getId(), searchVal);
dataSourceList = dataSourceMapper.selectPaging(dataSourcePage,
UserType.ADMIN_USER.equals(loginUser.getUserType()) ? 0 : loginUser.getId(), searchVal);
} else {
Set<Integer> ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger);
Set<Integer> ids = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger);
if (ids.isEmpty()) {
result.setData(pageInfo);
putMsg(result, Status.SUCCESS);
@ -318,13 +320,15 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
datasourceList = dataSourceMapper.queryDataSourceByType(0, type);
} else {
Set<Integer> ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger);
Set<Integer> ids = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger);
if (ids.isEmpty()) {
result.put(Constants.DATA_LIST, Collections.emptyList());
putMsg(result, Status.SUCCESS);
return result;
}
datasourceList = dataSourceMapper.selectBatchIds(ids).stream().filter(dataSource -> dataSource.getType().getCode() == type).collect(Collectors.toList());
datasourceList = dataSourceMapper.selectBatchIds(ids).stream()
.filter(dataSource -> dataSource.getType().getCode() == type).collect(Collectors.toList());
}
result.put(Constants.DATA_LIST, datasourceList);
putMsg(result, Status.SUCCESS);
@ -361,7 +365,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
@Override
public Result<Object> checkConnection(DbType type, ConnectionParam connectionParam) {
Result<Object> result = new Result<>();
try (Connection connection = DataSourceClientProvider.getInstance().getConnection(type, connectionParam)) {
try (Connection connection = DataSourceUtils.getConnection(type, connectionParam)) {
if (connection == null) {
putMsg(result, Status.CONNECTION_TEST_FAILURE);
return result;
@ -372,7 +376,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
String message = Optional.of(e).map(Throwable::getCause)
.map(Throwable::getMessage)
.orElse(e.getMessage());
logger.error("datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type, connectionParam, message);
logger.error("datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type,
connectionParam, message);
return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(), message);
}
}
@ -391,7 +396,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
return checkConnection(dataSource.getType(), DataSourceUtils.buildConnectionParams(dataSource.getType(), dataSource.getConnectionParams()));
return checkConnection(dataSource.getType(),
DataSourceUtils.buildConnectionParams(dataSource.getType(), dataSource.getConnectionParams()));
}
/**
@ -406,14 +412,15 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
public Result<Object> delete(User loginUser, int datasourceId) {
Result<Object> result = new Result<>();
try {
//query datasource by id
// query datasource by id
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
if (dataSource == null) {
logger.error("resource id {} not exist", datasourceId);
putMsg(result, Status.RESOURCE_NOT_EXIST);
return result;
}
if (!canOperatorPermissions(loginUser, new Object[]{dataSource.getId()},AuthorizationType.DATASOURCE,DATASOURCE_DELETE)) {
if (!canOperatorPermissions(loginUser, new Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE,
DATASOURCE_DELETE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
}
@ -438,7 +445,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
public Map<String, Object> unauthDatasource(User loginUser, Integer userId) {
Map<String, Object> result = new HashMap<>();
List<DataSource> datasourceList;
if (canOperatorPermissions(loginUser,null,AuthorizationType.DATASOURCE,null)) {
if (canOperatorPermissions(loginUser, null, AuthorizationType.DATASOURCE, null)) {
// admin gets all data sources except userId
datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId);
} else {
@ -519,7 +526,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
tables = metaData.getTables(
connectionParam.getDatabase(),
getDbSchemaPattern(dataSource.getType(),schema,connectionParam),
getDbSchemaPattern(dataSource.getType(), schema, connectionParam),
"%", TABLE_TYPES);
if (null == tables) {
putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
@ -549,7 +556,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
}
@Override
public Map<String, Object> getTableColumns(Integer datasourceId,String tableName) {
public Map<String, Object> getTableColumns(Integer datasourceId, String tableName) {
Map<String, Object> result = new HashMap<>();
DataSource dataSource = dataSourceMapper.selectById(datasourceId);
@ -615,7 +622,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
return options;
}
private String getDbSchemaPattern(DbType dbType,String schema,BaseConnectionParam connectionParam) {
private String getDbSchemaPattern(DbType dbType, String schema, BaseConnectionParam connectionParam) {
if (dbType == null) {
return null;
}

159
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java

@ -21,15 +21,14 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DATASOURCE_LIST;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.DataSourceServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.constants.DataSourceConstants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
@ -39,10 +38,8 @@ import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils;
import org.apache.dolphinscheduler.plugin.datasource.hive.param.HiveDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.oracle.param.OracleDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.postgresql.param.PostgreSQLDataSourceParamDTO;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbConnectType;
import org.apache.dolphinscheduler.spi.enums.DbType;
@ -51,9 +48,7 @@ import org.apache.commons.collections.CollectionUtils;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -61,11 +56,9 @@ import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
@ -81,6 +74,7 @@ import org.slf4j.LoggerFactory;
@PowerMockIgnore({"sun.security.*", "javax.net.*"})
@PrepareForTest({DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class})
public class DataSourceServiceTest {
private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class);
private static final Logger logger = LoggerFactory.getLogger(DataSourceServiceTest.class);
private static final Logger dataSourceServiceLogger = LoggerFactory.getLogger(DataSourceServiceImpl.class);
@ -124,22 +118,26 @@ public class DataSourceServiceTest {
DbType dataSourceType = postgreSqlDatasourceParam.getType();
// data source exits
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
Result connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg());
//PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType, connectionParam);
Result connectionResult =
new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg());
// PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType,
connectionParam);
Result connectFailedResult = dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), connectFailedResult.getCode().intValue());
// data source exits
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
connectionResult = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)).thenReturn(connectionResult);
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam))
.thenReturn(connectionResult);
Result notValidError = dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), notValidError.getCode().intValue());
// success
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)).thenReturn(connectionResult);
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam))
.thenReturn(connectionResult);
Result success = dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue());
}
@ -162,13 +160,15 @@ public class DataSourceServiceTest {
// data source not exits
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null);
Result resourceNotExits = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Result resourceNotExits =
dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getCode(), resourceNotExits.getCode().intValue());
// user no operation perm
DataSource dataSource = new DataSource();
dataSource.setUserId(0);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
Result userNoOperationPerm = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Result userNoOperationPerm =
dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), userNoOperationPerm.getCode().intValue());
// data source name exits
@ -177,7 +177,8 @@ public class DataSourceServiceTest {
dataSourceList.add(dataSource);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(dataSourceList);
Result dataSourceNameExist = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Result dataSourceNameExist =
dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.DATASOURCE_EXIST.getCode(), dataSourceNameExist.getCode().intValue());
// data source connect failed
@ -186,15 +187,18 @@ public class DataSourceServiceTest {
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
Result connectionResult = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)).thenReturn(connectionResult);
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam))
.thenReturn(connectionResult);
Result connectFailed = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), connectFailed.getCode().intValue());
//success
// success
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg());
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)).thenReturn(connectionResult);
connectionResult =
new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg());
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam))
.thenReturn(connectionResult);
Result success = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam);
Assert.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue());
@ -209,12 +213,15 @@ public class DataSourceServiceTest {
String searchVal = "";
int pageNo = 1;
int pageSize = 10;
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), DATASOURCE_LIST, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), baseServiceLogger)).thenReturn(ids);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null,
loginUser.getId(), DATASOURCE_LIST, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null,
loginUser.getId(), baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
loginUser.getId(), baseServiceLogger)).thenReturn(ids);
Result result = dataSourceService.queryDataSourceListPaging(loginUser, searchVal, pageNo, pageSize);
Assert.assertEquals(Status.SUCCESS.getCode(),(int)result.getCode());
Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode());
}
@Test
@ -230,9 +237,11 @@ public class DataSourceServiceTest {
User loginUser = getAdminUser();
int dataSourceId = 1;
Result result = new Result();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, new Object[]{dataSourceId}, -1, baseServiceLogger)).thenReturn(true);
//resource not exist
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null,
loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
new Object[]{dataSourceId}, -1, baseServiceLogger)).thenReturn(true);
// resource not exist
dataSourceService.putMsg(result, Status.RESOURCE_NOT_EXIST);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null);
Assert.assertEquals(result.getCode(), dataSourceService.delete(loginUser, dataSourceId).getCode());
@ -250,8 +259,10 @@ public class DataSourceServiceTest {
loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setId(1);
dataSource.setId(22);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), DATASOURCE_DELETE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,new Object[]{dataSource.getId()} , 0, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null,
loginUser.getId(), DATASOURCE_DELETE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,
new Object[]{dataSource.getId()}, 0, baseServiceLogger)).thenReturn(true);
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
Assert.assertEquals(result.getCode(), dataSourceService.delete(loginUser, dataSourceId).getCode());
@ -263,8 +274,10 @@ public class DataSourceServiceTest {
loginUser.setId(1);
loginUser.setUserType(UserType.ADMIN_USER);
int userId = 3;
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, 0, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null,
loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, 0,
baseServiceLogger)).thenReturn(true);
// test admin user
Mockito.when(dataSourceMapper.queryAuthedDatasource(userId)).thenReturn(getSingleDataSourceList());
Mockito.when(dataSourceMapper.queryDatasourceExceptUserId(userId)).thenReturn(getDataSourceList());
@ -276,7 +289,8 @@ public class DataSourceServiceTest {
// test non-admin user
loginUser.setId(2);
loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(dataSourceMapper.selectByMap(Collections.singletonMap("user_id", loginUser.getId()))).thenReturn(getDataSourceList());
Mockito.when(dataSourceMapper.selectByMap(Collections.singletonMap("user_id", loginUser.getId())))
.thenReturn(getDataSourceList());
result = dataSourceService.unauthDatasource(loginUser, userId);
logger.info(result.toString());
dataSources = (List<DataSource>) result.get(Constants.DATA_LIST);
@ -311,9 +325,12 @@ public class DataSourceServiceTest {
loginUser.setUserType(UserType.GENERAL_USER);
Set<Integer> dataSourceIds = new HashSet<>();
dataSourceIds.add(1);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, 0, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), dataSourceServiceLogger)).thenReturn(dataSourceIds);
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null,
loginUser.getId(), null, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, 0,
baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE,
loginUser.getId(), dataSourceServiceLogger)).thenReturn(dataSourceIds);
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
@ -361,8 +378,9 @@ public class DataSourceServiceTest {
dataSource.setName("test");
dataSource.setNote("Note");
dataSource.setType(DbType.ORACLE);
dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
+ "\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
dataSource.setConnectionParams(
"{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
+ "\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
return dataSource;
}
@ -373,8 +391,9 @@ public class DataSourceServiceTest {
dataSource.setName("test");
dataSource.setNote("Note");
dataSource.setType(DbType.ORACLE);
dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
+ "\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
dataSource.setConnectionParams(
"{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\","
+ "\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}");
return dataSource;
}
@ -390,8 +409,9 @@ public class DataSourceServiceTest {
oracleDatasourceParamDTO.setConnectType(DbConnectType.ORACLE_SERVICE_NAME);
ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(oracleDatasourceParamDTO);
String expected = "{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\","
+ "\"driverClassName\":\"oracle.jdbc.OracleDriver\",\"validationQuery\":\"select 1 from dual\",\"connectType\":\"ORACLE_SERVICE_NAME\"}";
String expected =
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\","
+ "\"driverClassName\":\"oracle.jdbc.OracleDriver\",\"validationQuery\":\"select 1 from dual\",\"connectType\":\"ORACLE_SERVICE_NAME\"}";
Assert.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
PowerMockito.mockStatic(CommonUtils.class);
@ -410,36 +430,38 @@ public class DataSourceServiceTest {
hiveDataSourceParamDTO.setLoginUserKeytabUsername("test2/hdfs-mycluster@ESZ.COM");
connectionParam = DataSourceUtils.buildConnectionParams(hiveDataSourceParamDTO);
expected = "{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:hive2://192.168.9.1:10000\",\"database\":\"im\","
+ "\"jdbcUrl\":\"jdbc:hive2://192.168.9.1:10000/im\",\"driverClassName\":\"org.apache.hive.jdbc.HiveDriver\",\"validationQuery\":\"select 1\","
+ "\"principal\":\"hive/hdfs-mycluster@ESZ.COM\",\"javaSecurityKrb5Conf\":\"/opt/krb5.conf\",\"loginUserKeytabUsername\":\"test2/hdfs-mycluster@ESZ.COM\","
+ "\"loginUserKeytabPath\":\"/opt/hdfs.headless.keytab\"}";
expected =
"{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:hive2://192.168.9.1:10000\",\"database\":\"im\","
+ "\"jdbcUrl\":\"jdbc:hive2://192.168.9.1:10000/im\",\"driverClassName\":\"org.apache.hive.jdbc.HiveDriver\",\"validationQuery\":\"select 1\","
+ "\"principal\":\"hive/hdfs-mycluster@ESZ.COM\",\"javaSecurityKrb5Conf\":\"/opt/krb5.conf\",\"loginUserKeytabUsername\":\"test2/hdfs-mycluster@ESZ.COM\","
+ "\"loginUserKeytabPath\":\"/opt/hdfs.headless.keytab\"}";
Assert.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
}
@Test
public void buildParameterWithDecodePassword() {
// try (MockedStatic<PropertyUtils> mockedStaticPropertyUtils = Mockito.mockStatic(PropertyUtils.class)) {
// mockedStaticPropertyUtils
// .when(() -> PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, false))
// .thenReturn(true);
// Map<String, String> other = new HashMap<>();
// other.put("autoDeserialize", "yes");
// other.put("allowUrlInLocalInfile", "true");
// MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO();
// mysqlDatasourceParamDTO.setHost("192.168.9.1");
// mysqlDatasourceParamDTO.setPort(1521);
// mysqlDatasourceParamDTO.setDatabase("im");
// mysqlDatasourceParamDTO.setUserName("test");
// mysqlDatasourceParamDTO.setPassword("123456");
// mysqlDatasourceParamDTO.setOther(other);
// ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO);
// String expected =
// "{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/"
// + "im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select 1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}";
// Assertions.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
// }
// try (MockedStatic<PropertyUtils> mockedStaticPropertyUtils = Mockito.mockStatic(PropertyUtils.class)) {
// mockedStaticPropertyUtils
// .when(() -> PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, false))
// .thenReturn(true);
// Map<String, String> other = new HashMap<>();
// other.put("autoDeserialize", "yes");
// other.put("allowUrlInLocalInfile", "true");
// MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO();
// mysqlDatasourceParamDTO.setHost("192.168.9.1");
// mysqlDatasourceParamDTO.setPort(1521);
// mysqlDatasourceParamDTO.setDatabase("im");
// mysqlDatasourceParamDTO.setUserName("test");
// mysqlDatasourceParamDTO.setPassword("123456");
// mysqlDatasourceParamDTO.setOther(other);
// ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO);
// String expected =
// "{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/"
// + "im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select
// 1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}";
// Assertions.assertEquals(expected, JSONUtils.toJsonString(connectionParam));
// }
}
/**
@ -475,15 +497,12 @@ public class DataSourceServiceTest {
ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(postgreSqlDatasourceParam);
PowerMockito.mockStatic(DataSourceUtils.class);
PowerMockito.mockStatic(DataSourceClientProvider.class);
DataSourceClientProvider clientProvider = PowerMockito.mock(DataSourceClientProvider.class);
PowerMockito.when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider);
PowerMockito.when(DataSourceUtils.getConnection(Mockito.any(), Mockito.any())).thenReturn(null);
Result result = dataSourceService.checkConnection(dataSourceType, connectionParam);
Assert.assertEquals(Status.CONNECTION_TEST_FAILURE.getCode(), result.getCode().intValue());
Connection connection = PowerMockito.mock(Connection.class);
PowerMockito.when(clientProvider.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection);
PowerMockito.when(DataSourceUtils.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection);
result = dataSourceService.checkConnection(dataSourceType, connectionParam);
Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue());

4
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java

@ -79,8 +79,8 @@ public class CommonUtils {
* @param loginUserKeytabPath loginUserKeytabPath
* @throws IOException errors
*/
public static void loadKerberosConf(String javaSecurityKrb5Conf, String loginUserKeytabUsername,
String loginUserKeytabPath) throws IOException {
public static synchronized void loadKerberosConf(String javaSecurityKrb5Conf, String loginUserKeytabUsername,
String loginUserKeytabPath) throws IOException {
Configuration configuration = new Configuration();
configuration.setClassLoader(configuration.getClass().getClassLoader());
loadKerberosConf(javaSecurityKrb5Conf, loginUserKeytabUsername, loginUserKeytabPath, configuration);

49
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -17,29 +17,15 @@
package org.apache.dolphinscheduler.plugin.task.datax;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
@ -74,11 +60,23 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class DataxTask extends AbstractTask {
/**
* jvm parameters
*/
@ -141,7 +139,8 @@ public class DataxTask extends AbstractTask {
throw new RuntimeException("datax task params is not valid");
}
dataxTaskExecutionContext = dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
dataxTaskExecutionContext =
dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
}
/**
@ -195,8 +194,7 @@ public class DataxTask extends AbstractTask {
* @return datax json file name
* @throws Exception if error throws Exception
*/
private String buildDataxJsonFile(Map<String, Property> paramsMap)
throws Exception {
private String buildDataxJsonFile(Map<String, Property> paramsMap) throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json",
taskExecutionContext.getExecutePath(),
@ -273,7 +271,8 @@ public class DataxTask extends AbstractTask {
ArrayNode tableArr = writerConn.putArray("table");
tableArr.add(dataXParameters.getTargetTable());
writerConn.put("jdbcUrl", DataSourceUtils.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataTargetCfg));
writerConn.put("jdbcUrl",
DataSourceUtils.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataTargetCfg));
writerConnArr.add(writerConn);
ObjectNode writerParam = JSONUtils.createObjectNode();
@ -379,8 +378,7 @@ public class DataxTask extends AbstractTask {
* @return shell command file name
* @throws Exception if error throws Exception
*/
private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap)
throws Exception {
private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
@ -468,7 +466,8 @@ public class DataxTask extends AbstractTask {
* @param sql sql for data synchronization
* @return Keyword converted column names
*/
private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, String sql) {
private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg,
String sql) {
String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql);
if (columnNames == null || columnNames.length == 0) {
@ -565,7 +564,7 @@ public class DataxTask extends AbstractTask {
sql = sql.replace(";", "");
try (
Connection connection = DataSourceClientProvider.getInstance().getConnection(sourceType, baseDataSource);
Connection connection = DataSourceUtils.getConnection(sourceType, baseDataSource);
PreparedStatement stmt = connection.prepareStatement(sql);
ResultSet resultSet = stmt.executeQuery()) {
@ -573,9 +572,9 @@ public class DataxTask extends AbstractTask {
int num = md.getColumnCount();
columnNames = new String[num];
for (int i = 1; i <= num; i++) {
columnNames[i - 1] = md.getColumnName(i).replace("t.","");
columnNames[i - 1] = md.getColumnName(i).replace("t.", "");
}
} catch (SQLException | ExecutionException e) {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
return null;
}

3
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
@ -108,7 +107,7 @@ public class ProcedureTask extends AbstractTask {
procedureTaskExecutionContext.getConnectionParams());
// get jdbc connection
connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam);
connection = DataSourceUtils.getConnection(dbType, connectionParam);
Map<Integer, Property> sqlParamsMap = new HashMap<>();
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap()

61
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -17,12 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.sql;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@ -44,8 +39,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.slf4j.Logger;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -65,6 +58,11 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class SqlTask extends AbstractTask {
/**
@ -86,7 +84,8 @@ public class SqlTask extends AbstractTask {
* create function format
* include replace here which can be compatible with more cases, for example a long-running Spark session in Kyuubi will keep its own temp functions instead of destroying them right away
*/
private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT = "create or replace temporary function {0} as ''{1}''";
private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT =
"create or replace temporary function {0} as ''{1}''";
/**
* default query sql limit
@ -110,7 +109,8 @@ public class SqlTask extends AbstractTask {
throw new RuntimeException("sql task params is not valid");
}
sqlTaskExecutionContext = sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
sqlTaskExecutionContext =
sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
}
@Override
@ -121,7 +121,8 @@ public class SqlTask extends AbstractTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
logger.info(
"sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
sqlParameters.getType(),
sqlParameters.getDatasource(),
sqlParameters.getSql(),
@ -139,10 +140,11 @@ public class SqlTask extends AbstractTask {
sqlTaskExecutionContext.getConnectionParams());
// ready to execute SQL and parameter entity Map
List<SqlBinds> mainStatementSqlBinds = SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator())
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
List<SqlBinds> mainStatementSqlBinds =
SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator())
.stream()
.map(this::getSqlAndSqlParamsMap)
.collect(Collectors.toList());
List<SqlBinds> preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements())
.orElse(new ArrayList<>())
@ -190,7 +192,7 @@ public class SqlTask extends AbstractTask {
try {
// create connection
connection = DataSourceClientProvider.getInstance().getConnection(DbType.valueOf(sqlParameters.getType()), baseConnectionParam);
connection = DataSourceUtils.getConnection(DbType.valueOf(sqlParameters.getType()), baseConnectionParam);
// create temp function
if (CollectionUtils.isNotEmpty(createFuncs)) {
createTempFunction(connection, createFuncs);
@ -210,7 +212,7 @@ public class SqlTask extends AbstractTask {
String updateResult = executeUpdate(connection, mainStatementsBinds, "main");
result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams());
}
//deal out params
// deal out params
sqlParameters.dealOutParam(result);
// post execute
@ -265,7 +267,8 @@ public class SqlTask extends AbstractTask {
resultJSONArray.add(mapOfColValues);
rowCount++;
}
int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS;
int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows()
: TaskConstants.DEFAULT_DISPLAY_ROWS;
displayRows = Math.min(displayRows, rowCount);
logger.info("display sql result {} rows as follows:", displayRows);
for (int i = 0; i < displayRows; i++) {
@ -305,12 +308,14 @@ public class SqlTask extends AbstractTask {
}
}
private String executeUpdate(Connection connection, List<SqlBinds> statementsBinds, String handlerType) throws Exception {
private String executeUpdate(Connection connection, List<SqlBinds> statementsBinds,
String handlerType) throws Exception {
int result = 0;
for (SqlBinds sqlBind : statementsBinds) {
try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBind)) {
result = statement.executeUpdate();
logger.info("{} statement execute update result: {}, for sql: {}", handlerType, result, sqlBind.getSql());
logger.info("{} statement execute update result: {}, for sql: {}", handlerType, result,
sqlBind.getSql());
}
}
return String.valueOf(result);
@ -371,7 +376,8 @@ public class SqlTask extends AbstractTask {
ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue());
}
}
logger.info("prepare statement replace sql : {}, sql parameters : {}", sqlBinds.getSql(), sqlBinds.getParamsMap());
logger.info("prepare statement replace sql : {}, sql parameters : {}", sqlBinds.getSql(),
sqlBinds.getParamsMap());
return stmt;
} catch (Exception exception) {
throw new TaskException("SQL task prepareStatementAndBind error", exception);
@ -387,14 +393,15 @@ public class SqlTask extends AbstractTask {
* @param sqlParamsMap sql params map
*/
private void printReplacedSql(String content, String formatSql, String rgex, Map<Integer, Property> sqlParamsMap) {
//parameter print style
// parameter print style
logger.info("after replace sql , preparing : {}", formatSql);
StringBuilder logPrint = new StringBuilder("replaced sql , parameters:");
if (sqlParamsMap == null) {
logger.info("printReplacedSql: sqlParamsMap is null.");
} else {
for (int i = 1; i <= sqlParamsMap.size(); i++) {
logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType()).append(")");
logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType())
.append(")");
}
}
logger.info("Sql Params are {}", logPrint);
@ -428,8 +435,8 @@ public class SqlTask extends AbstractTask {
}
// special characters need to be escaped, ${} needs to be escaped
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap,taskExecutionContext.getTaskInstanceId());
//Replace the original value in sql !{...} ,Does not participate in precompilation
setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId());
// Replace the original value in sql !{...} ,Does not participate in precompilation
String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*";
sql = replaceOriginalValue(sql, rgexo, paramsMap);
// replace the ${} of the SQL statement with the Placeholder
@ -485,7 +492,8 @@ public class SqlTask extends AbstractTask {
*/
private List<String> buildTempFuncSql(List<UdfFuncParameters> udfFuncParameters) {
return udfFuncParameters.stream().map(value -> MessageFormat
.format(CREATE_OR_REPLACE_FUNCTION_FORMAT, value.getFuncName(), value.getClassName())).collect(Collectors.toList());
.format(CREATE_OR_REPLACE_FUNCTION_FORMAT, value.getFuncName(), value.getClassName()))
.collect(Collectors.toList());
}
/**
@ -499,7 +507,8 @@ public class SqlTask extends AbstractTask {
String prefixPath = defaultFS.startsWith("file://") ? "file://" : defaultFS;
String uploadPath = CommonUtils.getHdfsUdfDir(value.getTenantCode());
String resourceFullName = value.getResourceName();
resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s", resourceFullName);
resourceFullName =
resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s", resourceFullName);
return String.format("add jar %s%s%s", prefixPath, uploadPath, resourceFullName);
}).collect(Collectors.toList());
}

Loading…
Cancel
Save