diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java index 7080b83646..309d2ce01f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java @@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO; -import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; @@ -70,8 +69,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.fasterxml.jackson.databind.node.ObjectNode; - - /** * data source service impl */ @@ -86,7 +83,6 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource @Autowired private DataSourceUserMapper datasourceUserMapper; - private static final String TABLE = "TABLE"; private static final String VIEW = "VIEW"; private static final String[] TABLE_TYPES = new String[]{TABLE, VIEW}; @@ -105,7 +101,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource public Result createDataSource(User loginUser, BaseDataSourceParamDTO datasourceParam) { DataSourceUtils.checkDatasourceParam(datasourceParam); Result result = new Result<>(); - if (!canOperatorPermissions(loginUser,null, AuthorizationType.DATASOURCE, ApiFuncIdentificationConstant.DATASOURCE_CREATE_DATASOURCE)) { + if (!canOperatorPermissions(loginUser, null, AuthorizationType.DATASOURCE, + ApiFuncIdentificationConstant.DATASOURCE_CREATE_DATASOURCE)) { putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } @@ -114,7 +111,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource putMsg(result, Status.DATASOURCE_EXIST); return result; } - if(checkDescriptionLength(datasourceParam.getNote())){ + if (checkDescriptionLength(datasourceParam.getNote())) { putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); return result; } @@ -141,7 +138,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource try { dataSourceMapper.insert(dataSource); putMsg(result, Status.SUCCESS); - permissionPostHandle(AuthorizationType.DATASOURCE, loginUser.getId(), Collections.singletonList(dataSource.getId()), logger); + permissionPostHandle(AuthorizationType.DATASOURCE, loginUser.getId(), + Collections.singletonList(dataSource.getId()), logger); } catch (DuplicateKeyException ex) { logger.error("Create datasource error.", ex); putMsg(result, Status.DATASOURCE_EXIST); @@ -168,22 +166,24 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource return result; } - if (!canOperatorPermissions(loginUser,new Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE, DATASOURCE_UPDATE)) { + if (!canOperatorPermissions(loginUser, new Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE, + DATASOURCE_UPDATE)) { putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } - //check name can use or not + // check name can use or not if (!dataSource.getName().trim().equals(dataSource.getName()) && checkName(dataSource.getName())) { putMsg(result, Status.DATASOURCE_EXIST); return result; } - if(checkDescriptionLength(dataSourceParam.getNote())){ + if (checkDescriptionLength(dataSourceParam.getNote())) { putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); return result; } - //check password,if the password is not updated, set to the old password. - BaseConnectionParam connectionParam = (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dataSourceParam); + // check password,if the password is not updated, set to the old password. + BaseConnectionParam connectionParam = + (BaseConnectionParam) DataSourceUtils.buildConnectionParams(dataSourceParam); String password = connectionParam.getPassword(); if (StringUtils.isBlank(password)) { String oldConnectionParams = dataSource.getConnectionParams(); @@ -262,9 +262,11 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource Page dataSourcePage = new Page<>(pageNo, pageSize); PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); if (loginUser.getUserType().equals(UserType.ADMIN_USER)) { - dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, UserType.ADMIN_USER.equals(loginUser.getUserType()) ? 0 : loginUser.getId(), searchVal); + dataSourceList = dataSourceMapper.selectPaging(dataSourcePage, + UserType.ADMIN_USER.equals(loginUser.getUserType()) ? 0 : loginUser.getId(), searchVal); } else { - Set ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger); + Set ids = resourcePermissionCheckService + .userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger); if (ids.isEmpty()) { result.setData(pageInfo); putMsg(result, Status.SUCCESS); @@ -318,13 +320,15 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource if (loginUser.getUserType().equals(UserType.ADMIN_USER)) { datasourceList = dataSourceMapper.queryDataSourceByType(0, type); } else { - Set ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger); + Set ids = resourcePermissionCheckService + .userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger); if (ids.isEmpty()) { result.put(Constants.DATA_LIST, Collections.emptyList()); putMsg(result, Status.SUCCESS); return result; } - datasourceList = dataSourceMapper.selectBatchIds(ids).stream().filter(dataSource -> dataSource.getType().getCode() == type).collect(Collectors.toList()); + datasourceList = dataSourceMapper.selectBatchIds(ids).stream() + .filter(dataSource -> dataSource.getType().getCode() == type).collect(Collectors.toList()); } result.put(Constants.DATA_LIST, datasourceList); putMsg(result, Status.SUCCESS); @@ -361,7 +365,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource @Override public Result checkConnection(DbType type, ConnectionParam connectionParam) { Result result = new Result<>(); - try (Connection connection = DataSourceClientProvider.getInstance().getConnection(type, connectionParam)) { + try (Connection connection = DataSourceUtils.getConnection(type, connectionParam)) { if (connection == null) { putMsg(result, Status.CONNECTION_TEST_FAILURE); return result; @@ -372,7 +376,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource String message = Optional.of(e).map(Throwable::getCause) .map(Throwable::getMessage) .orElse(e.getMessage()); - logger.error("datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type, connectionParam, message); + logger.error("datasource test connection error, dbType:{}, connectionParam:{}, message:{}.", type, + connectionParam, message); return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(), message); } } @@ -391,7 +396,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource putMsg(result, Status.RESOURCE_NOT_EXIST); return result; } - return checkConnection(dataSource.getType(), DataSourceUtils.buildConnectionParams(dataSource.getType(), dataSource.getConnectionParams())); + return checkConnection(dataSource.getType(), + DataSourceUtils.buildConnectionParams(dataSource.getType(), dataSource.getConnectionParams())); } /** @@ -406,14 +412,15 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource public Result delete(User loginUser, int datasourceId) { Result result = new Result<>(); try { - //query datasource by id + // query datasource by id DataSource dataSource = dataSourceMapper.selectById(datasourceId); if (dataSource == null) { logger.error("resource id {} not exist", datasourceId); putMsg(result, Status.RESOURCE_NOT_EXIST); return result; } - if (!canOperatorPermissions(loginUser, new Object[]{dataSource.getId()},AuthorizationType.DATASOURCE,DATASOURCE_DELETE)) { + if (!canOperatorPermissions(loginUser, new Object[]{dataSource.getId()}, AuthorizationType.DATASOURCE, + DATASOURCE_DELETE)) { putMsg(result, Status.USER_NO_OPERATION_PERM); return result; } @@ -438,7 +445,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource public Map unauthDatasource(User loginUser, Integer userId) { Map result = new HashMap<>(); List datasourceList; - if (canOperatorPermissions(loginUser,null,AuthorizationType.DATASOURCE,null)) { + if (canOperatorPermissions(loginUser, null, AuthorizationType.DATASOURCE, null)) { // admin gets all data sources except userId datasourceList = dataSourceMapper.queryDatasourceExceptUserId(userId); } else { @@ -519,7 +526,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource tables = metaData.getTables( connectionParam.getDatabase(), - getDbSchemaPattern(dataSource.getType(),schema,connectionParam), + getDbSchemaPattern(dataSource.getType(), schema, connectionParam), "%", TABLE_TYPES); if (null == tables) { putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR); @@ -549,7 +556,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource } @Override - public Map getTableColumns(Integer datasourceId,String tableName) { + public Map getTableColumns(Integer datasourceId, String tableName) { Map result = new HashMap<>(); DataSource dataSource = dataSourceMapper.selectById(datasourceId); @@ -615,7 +622,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource return options; } - private String getDbSchemaPattern(DbType dbType,String schema,BaseConnectionParam connectionParam) { + private String getDbSchemaPattern(DbType dbType, String schema, BaseConnectionParam connectionParam) { if (dbType == null) { return null; } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java index 2c46170618..30b9cbd150 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java @@ -21,15 +21,14 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DATASOURCE_LIST; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; import org.apache.dolphinscheduler.api.service.impl.DataSourceServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.constants.DataSourceConstants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; @@ -39,10 +38,8 @@ import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils; import org.apache.dolphinscheduler.plugin.datasource.hive.param.HiveDataSourceParamDTO; -import org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.oracle.param.OracleDataSourceParamDTO; import org.apache.dolphinscheduler.plugin.datasource.postgresql.param.PostgreSQLDataSourceParamDTO; -import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbConnectType; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -51,9 +48,7 @@ import org.apache.commons.collections.CollectionUtils; import java.sql.Connection; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -61,11 +56,9 @@ import java.util.Set; import org.junit.Assert; import org.junit.Test; -import org.junit.jupiter.api.Assertions; import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.MockedStatic; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -81,6 +74,7 @@ import org.slf4j.LoggerFactory; @PowerMockIgnore({"sun.security.*", "javax.net.*"}) @PrepareForTest({DataSourceUtils.class, CommonUtils.class, DataSourceClientProvider.class, PasswordUtils.class}) public class DataSourceServiceTest { + private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class); private static final Logger logger = LoggerFactory.getLogger(DataSourceServiceTest.class); private static final Logger dataSourceServiceLogger = LoggerFactory.getLogger(DataSourceServiceImpl.class); @@ -124,22 +118,26 @@ public class DataSourceServiceTest { DbType dataSourceType = postgreSqlDatasourceParam.getType(); // data source exits PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null); - Result connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg()); - //PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult); - PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType, connectionParam); + Result connectionResult = + new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg()); + // PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult); + PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType, + connectionParam); Result connectFailedResult = dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam); Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), connectFailedResult.getCode().intValue()); // data source exits PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null); connectionResult = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); - PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)).thenReturn(connectionResult); + PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)) + .thenReturn(connectionResult); Result notValidError = dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam); Assert.assertEquals(Status.REQUEST_PARAMS_NOT_VALID_ERROR.getCode(), notValidError.getCode().intValue()); // success PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null); - PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)).thenReturn(connectionResult); + PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)) + .thenReturn(connectionResult); Result success = dataSourceService.createDataSource(loginUser, postgreSqlDatasourceParam); Assert.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue()); } @@ -162,13 +160,15 @@ public class DataSourceServiceTest { // data source not exits PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null); - Result resourceNotExits = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam); + Result resourceNotExits = + dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getCode(), resourceNotExits.getCode().intValue()); // user no operation perm DataSource dataSource = new DataSource(); dataSource.setUserId(0); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); - Result userNoOperationPerm = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam); + Result userNoOperationPerm = + dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam); Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), userNoOperationPerm.getCode().intValue()); // data source name exits @@ -177,7 +177,8 @@ public class DataSourceServiceTest { dataSourceList.add(dataSource); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(dataSourceList); - Result dataSourceNameExist = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam); + Result dataSourceNameExist = + dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam); Assert.assertEquals(Status.DATASOURCE_EXIST.getCode(), dataSourceNameExist.getCode().intValue()); // data source connect failed @@ -186,15 +187,18 @@ public class DataSourceServiceTest { PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null); Result connectionResult = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); - PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)).thenReturn(connectionResult); + PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)) + .thenReturn(connectionResult); Result connectFailed = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam); Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), connectFailed.getCode().intValue()); - //success + // success PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null); - connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg()); - PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)).thenReturn(connectionResult); + connectionResult = + new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg()); + PowerMockito.when(dataSourceService.checkConnection(dataSourceType, connectionParam)) + .thenReturn(connectionResult); Result success = dataSourceService.updateDataSource(dataSourceId, loginUser, postgreSqlDatasourceParam); Assert.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue()); @@ -209,12 +213,15 @@ public class DataSourceServiceTest { String searchVal = ""; int pageNo = 1; int pageSize = 10; - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), DATASOURCE_LIST, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), baseServiceLogger)).thenReturn(ids); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, + loginUser.getId(), DATASOURCE_LIST, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, + loginUser.getId(), baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, + loginUser.getId(), baseServiceLogger)).thenReturn(ids); Result result = dataSourceService.queryDataSourceListPaging(loginUser, searchVal, pageNo, pageSize); - Assert.assertEquals(Status.SUCCESS.getCode(),(int)result.getCode()); + Assert.assertEquals(Status.SUCCESS.getCode(), (int) result.getCode()); } @Test @@ -230,9 +237,11 @@ public class DataSourceServiceTest { User loginUser = getAdminUser(); int dataSourceId = 1; Result result = new Result(); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, new Object[]{dataSourceId}, -1, baseServiceLogger)).thenReturn(true); - //resource not exist + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, + loginUser.getId(), null, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, + new Object[]{dataSourceId}, -1, baseServiceLogger)).thenReturn(true); + // resource not exist dataSourceService.putMsg(result, Status.RESOURCE_NOT_EXIST); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null); Assert.assertEquals(result.getCode(), dataSourceService.delete(loginUser, dataSourceId).getCode()); @@ -250,8 +259,10 @@ public class DataSourceServiceTest { loginUser.setUserType(UserType.ADMIN_USER); loginUser.setId(1); dataSource.setId(22); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), DATASOURCE_DELETE, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE,new Object[]{dataSource.getId()} , 0, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, + loginUser.getId(), DATASOURCE_DELETE, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, + new Object[]{dataSource.getId()}, 0, baseServiceLogger)).thenReturn(true); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); Assert.assertEquals(result.getCode(), dataSourceService.delete(loginUser, dataSourceId).getCode()); @@ -263,8 +274,10 @@ public class DataSourceServiceTest { loginUser.setId(1); loginUser.setUserType(UserType.ADMIN_USER); int userId = 3; - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, 0, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, + loginUser.getId(), null, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, 0, + baseServiceLogger)).thenReturn(true); // test admin user Mockito.when(dataSourceMapper.queryAuthedDatasource(userId)).thenReturn(getSingleDataSourceList()); Mockito.when(dataSourceMapper.queryDatasourceExceptUserId(userId)).thenReturn(getDataSourceList()); @@ -276,7 +289,8 @@ public class DataSourceServiceTest { // test non-admin user loginUser.setId(2); loginUser.setUserType(UserType.GENERAL_USER); - Mockito.when(dataSourceMapper.selectByMap(Collections.singletonMap("user_id", loginUser.getId()))).thenReturn(getDataSourceList()); + Mockito.when(dataSourceMapper.selectByMap(Collections.singletonMap("user_id", loginUser.getId()))) + .thenReturn(getDataSourceList()); result = dataSourceService.unauthDatasource(loginUser, userId); logger.info(result.toString()); dataSources = (List) result.get(Constants.DATA_LIST); @@ -311,9 +325,12 @@ public class DataSourceServiceTest { loginUser.setUserType(UserType.GENERAL_USER); Set dataSourceIds = new HashSet<>(); dataSourceIds.add(1); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, loginUser.getId(), null, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, 0, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), dataSourceServiceLogger)).thenReturn(dataSourceIds); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.DATASOURCE, null, + loginUser.getId(), null, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.DATASOURCE, null, 0, + baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, + loginUser.getId(), dataSourceServiceLogger)).thenReturn(dataSourceIds); DataSource dataSource = new DataSource(); dataSource.setType(DbType.MYSQL); @@ -361,8 +378,9 @@ public class DataSourceServiceTest { dataSource.setName("test"); dataSource.setNote("Note"); dataSource.setType(DbType.ORACLE); - dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\"," - + "\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}"); + dataSource.setConnectionParams( + "{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\"," + + "\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}"); return dataSource; } @@ -373,8 +391,9 @@ public class DataSourceServiceTest { dataSource.setName("test"); dataSource.setNote("Note"); dataSource.setType(DbType.ORACLE); - dataSource.setConnectionParams("{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\"," - + "\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}"); + dataSource.setConnectionParams( + "{\"connectType\":\"ORACLE_SID\",\"address\":\"jdbc:oracle:thin:@192.168.xx.xx:49161\",\"database\":\"XE\"," + + "\"jdbcUrl\":\"jdbc:oracle:thin:@192.168.xx.xx:49161/XE\",\"user\":\"system\",\"password\":\"oracle\"}"); return dataSource; } @@ -390,8 +409,9 @@ public class DataSourceServiceTest { oracleDatasourceParamDTO.setConnectType(DbConnectType.ORACLE_SERVICE_NAME); ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(oracleDatasourceParamDTO); - String expected = "{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\"," - + "\"driverClassName\":\"oracle.jdbc.OracleDriver\",\"validationQuery\":\"select 1 from dual\",\"connectType\":\"ORACLE_SERVICE_NAME\"}"; + String expected = + "{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\"," + + "\"driverClassName\":\"oracle.jdbc.OracleDriver\",\"validationQuery\":\"select 1 from dual\",\"connectType\":\"ORACLE_SERVICE_NAME\"}"; Assert.assertEquals(expected, JSONUtils.toJsonString(connectionParam)); PowerMockito.mockStatic(CommonUtils.class); @@ -410,36 +430,38 @@ public class DataSourceServiceTest { hiveDataSourceParamDTO.setLoginUserKeytabUsername("test2/hdfs-mycluster@ESZ.COM"); connectionParam = DataSourceUtils.buildConnectionParams(hiveDataSourceParamDTO); - expected = "{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:hive2://192.168.9.1:10000\",\"database\":\"im\"," - + "\"jdbcUrl\":\"jdbc:hive2://192.168.9.1:10000/im\",\"driverClassName\":\"org.apache.hive.jdbc.HiveDriver\",\"validationQuery\":\"select 1\"," - + "\"principal\":\"hive/hdfs-mycluster@ESZ.COM\",\"javaSecurityKrb5Conf\":\"/opt/krb5.conf\",\"loginUserKeytabUsername\":\"test2/hdfs-mycluster@ESZ.COM\"," - + "\"loginUserKeytabPath\":\"/opt/hdfs.headless.keytab\"}"; + expected = + "{\"user\":\"test\",\"password\":\"test\",\"address\":\"jdbc:hive2://192.168.9.1:10000\",\"database\":\"im\"," + + "\"jdbcUrl\":\"jdbc:hive2://192.168.9.1:10000/im\",\"driverClassName\":\"org.apache.hive.jdbc.HiveDriver\",\"validationQuery\":\"select 1\"," + + "\"principal\":\"hive/hdfs-mycluster@ESZ.COM\",\"javaSecurityKrb5Conf\":\"/opt/krb5.conf\",\"loginUserKeytabUsername\":\"test2/hdfs-mycluster@ESZ.COM\"," + + "\"loginUserKeytabPath\":\"/opt/hdfs.headless.keytab\"}"; Assert.assertEquals(expected, JSONUtils.toJsonString(connectionParam)); } @Test public void buildParameterWithDecodePassword() { -// try (MockedStatic mockedStaticPropertyUtils = Mockito.mockStatic(PropertyUtils.class)) { -// mockedStaticPropertyUtils -// .when(() -> PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, false)) -// .thenReturn(true); -// Map other = new HashMap<>(); -// other.put("autoDeserialize", "yes"); -// other.put("allowUrlInLocalInfile", "true"); -// MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO(); -// mysqlDatasourceParamDTO.setHost("192.168.9.1"); -// mysqlDatasourceParamDTO.setPort(1521); -// mysqlDatasourceParamDTO.setDatabase("im"); -// mysqlDatasourceParamDTO.setUserName("test"); -// mysqlDatasourceParamDTO.setPassword("123456"); -// mysqlDatasourceParamDTO.setOther(other); -// ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO); -// String expected = -// "{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/" -// + "im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select 1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}"; -// Assertions.assertEquals(expected, JSONUtils.toJsonString(connectionParam)); -// } + // try (MockedStatic mockedStaticPropertyUtils = Mockito.mockStatic(PropertyUtils.class)) { + // mockedStaticPropertyUtils + // .when(() -> PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, false)) + // .thenReturn(true); + // Map other = new HashMap<>(); + // other.put("autoDeserialize", "yes"); + // other.put("allowUrlInLocalInfile", "true"); + // MySQLDataSourceParamDTO mysqlDatasourceParamDTO = new MySQLDataSourceParamDTO(); + // mysqlDatasourceParamDTO.setHost("192.168.9.1"); + // mysqlDatasourceParamDTO.setPort(1521); + // mysqlDatasourceParamDTO.setDatabase("im"); + // mysqlDatasourceParamDTO.setUserName("test"); + // mysqlDatasourceParamDTO.setPassword("123456"); + // mysqlDatasourceParamDTO.setOther(other); + // ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(mysqlDatasourceParamDTO); + // String expected = + // "{\"user\":\"test\",\"password\":\"bnVsbE1USXpORFUy\",\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/" + // + "im\",\"driverClassName\":\"com.mysql.cj.jdbc.Driver\",\"validationQuery\":\"select + // 1\",\"props\":{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}}"; + // Assertions.assertEquals(expected, JSONUtils.toJsonString(connectionParam)); + // } } /** @@ -475,15 +497,12 @@ public class DataSourceServiceTest { ConnectionParam connectionParam = DataSourceUtils.buildConnectionParams(postgreSqlDatasourceParam); PowerMockito.mockStatic(DataSourceUtils.class); - PowerMockito.mockStatic(DataSourceClientProvider.class); - DataSourceClientProvider clientProvider = PowerMockito.mock(DataSourceClientProvider.class); - PowerMockito.when(DataSourceClientProvider.getInstance()).thenReturn(clientProvider); - + PowerMockito.when(DataSourceUtils.getConnection(Mockito.any(), Mockito.any())).thenReturn(null); Result result = dataSourceService.checkConnection(dataSourceType, connectionParam); Assert.assertEquals(Status.CONNECTION_TEST_FAILURE.getCode(), result.getCode().intValue()); Connection connection = PowerMockito.mock(Connection.class); - PowerMockito.when(clientProvider.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection); + PowerMockito.when(DataSourceUtils.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection); result = dataSourceService.checkConnection(dataSourceType, connectionParam); Assert.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); diff --git a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java index 0841e33eeb..773ad501db 100644 --- a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java +++ b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtils.java @@ -79,8 +79,8 @@ public class CommonUtils { * @param loginUserKeytabPath loginUserKeytabPath * @throws IOException errors */ - public static void loadKerberosConf(String javaSecurityKrb5Conf, String loginUserKeytabUsername, - String loginUserKeytabPath) throws IOException { + public static synchronized void loadKerberosConf(String javaSecurityKrb5Conf, String loginUserKeytabUsername, + String loginUserKeytabPath) throws IOException { Configuration configuration = new Configuration(); configuration.setClassLoader(configuration.getClass().getClassLoader()); loadKerberosConf(javaSecurityKrb5Conf, loginUserKeytabUsername, loginUserKeytabPath, configuration); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index a6b3d7da7b..61b67dd2dd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -17,29 +17,15 @@ package org.apache.dolphinscheduler.plugin.task.datax; -import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; -import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; -import com.alibaba.druid.sql.ast.statement.SQLSelect; -import com.alibaba.druid.sql.ast.statement.SQLSelectItem; -import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; -import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; -import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; -import com.alibaba.druid.sql.parser.SQLStatementParser; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; - import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; @@ -74,11 +60,23 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.alibaba.druid.sql.ast.SQLStatement; +import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; +import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; +import com.alibaba.druid.sql.ast.statement.SQLSelect; +import com.alibaba.druid.sql.ast.statement.SQLSelectItem; +import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock; +import com.alibaba.druid.sql.ast.statement.SQLSelectStatement; +import com.alibaba.druid.sql.ast.statement.SQLUnionQuery; +import com.alibaba.druid.sql.parser.SQLStatementParser; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class DataxTask extends AbstractTask { + /** * jvm parameters */ @@ -141,7 +139,8 @@ public class DataxTask extends AbstractTask { throw new RuntimeException("datax task params is not valid"); } - dataxTaskExecutionContext = dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); + dataxTaskExecutionContext = + dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); } /** @@ -195,8 +194,7 @@ public class DataxTask extends AbstractTask { * @return datax json file name * @throws Exception if error throws Exception */ - private String buildDataxJsonFile(Map paramsMap) - throws Exception { + private String buildDataxJsonFile(Map paramsMap) throws Exception { // generate json String fileName = String.format("%s/%s_job.json", taskExecutionContext.getExecutePath(), @@ -273,7 +271,8 @@ public class DataxTask extends AbstractTask { ArrayNode tableArr = writerConn.putArray("table"); tableArr.add(dataXParameters.getTargetTable()); - writerConn.put("jdbcUrl", DataSourceUtils.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataTargetCfg)); + writerConn.put("jdbcUrl", + DataSourceUtils.getJdbcUrl(DbType.valueOf(dataXParameters.getDtType()), dataTargetCfg)); writerConnArr.add(writerConn); ObjectNode writerParam = JSONUtils.createObjectNode(); @@ -379,8 +378,7 @@ public class DataxTask extends AbstractTask { * @return shell command file name * @throws Exception if error throws Exception */ - private String buildShellCommandFile(String jobConfigFilePath, Map paramsMap) - throws Exception { + private String buildShellCommandFile(String jobConfigFilePath, Map paramsMap) throws Exception { // generate scripts String fileName = String.format("%s/%s_node.%s", taskExecutionContext.getExecutePath(), @@ -468,7 +466,8 @@ public class DataxTask extends AbstractTask { * @param sql sql for data synchronization * @return Keyword converted column names */ - private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, String sql) { + private String[] parsingSqlColumnNames(DbType sourceType, DbType targetType, BaseConnectionParam dataSourceCfg, + String sql) { String[] columnNames = tryGrammaticalAnalysisSqlColumnNames(sourceType, sql); if (columnNames == null || columnNames.length == 0) { @@ -565,7 +564,7 @@ public class DataxTask extends AbstractTask { sql = sql.replace(";", ""); try ( - Connection connection = DataSourceClientProvider.getInstance().getConnection(sourceType, baseDataSource); + Connection connection = DataSourceUtils.getConnection(sourceType, baseDataSource); PreparedStatement stmt = connection.prepareStatement(sql); ResultSet resultSet = stmt.executeQuery()) { @@ -573,9 +572,9 @@ public class DataxTask extends AbstractTask { int num = md.getColumnCount(); columnNames = new String[num]; for (int i = 1; i <= num; i++) { - columnNames[i - 1] = md.getColumnName(i).replace("t.",""); + columnNames[i - 1] = md.getColumnName(i).replace("t.", ""); } - } catch (SQLException | ExecutionException e) { + } catch (SQLException e) { logger.error(e.getMessage(), e); return null; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index 69d04db280..e21c9b9f26 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; @@ -108,7 +107,7 @@ public class ProcedureTask extends AbstractTask { procedureTaskExecutionContext.getConnectionParams()); // get jdbc connection - connection = DataSourceClientProvider.getInstance().getConnection(dbType, connectionParam); + connection = DataSourceUtils.getConnection(dbType, connectionParam); Map sqlParamsMap = new HashMap<>(); Map paramsMap = taskExecutionContext.getPrepareParamsMap() == null ? Maps.newHashMap() diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 9e4c1df861..6d021c7fdf 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -17,12 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.sql; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider; import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; @@ -44,8 +39,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; -import org.slf4j.Logger; - import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -65,6 +58,11 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.slf4j.Logger; + +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + public class SqlTask extends AbstractTask { /** @@ -86,7 +84,8 @@ public class SqlTask extends AbstractTask { * create function format * include replace here which can be compatible with more cases, for example a long-running Spark session in Kyuubi will keep its own temp functions instead of destroying them right away */ - private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT = "create or replace temporary function {0} as ''{1}''"; + private static final String CREATE_OR_REPLACE_FUNCTION_FORMAT = + "create or replace temporary function {0} as ''{1}''"; /** * default query sql limit @@ -110,7 +109,8 @@ public class SqlTask extends AbstractTask { throw new RuntimeException("sql task params is not valid"); } - sqlTaskExecutionContext = sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); + sqlTaskExecutionContext = + sqlParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); } @Override @@ -121,7 +121,8 @@ public class SqlTask extends AbstractTask { @Override public void handle(TaskCallBack taskCallBack) throws TaskException { logger.info("Full sql parameters: {}", sqlParameters); - logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}", + logger.info( + "sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}", sqlParameters.getType(), sqlParameters.getDatasource(), sqlParameters.getSql(), @@ -139,10 +140,11 @@ public class SqlTask extends AbstractTask { sqlTaskExecutionContext.getConnectionParams()); // ready to execute SQL and parameter entity Map - List mainStatementSqlBinds = SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator()) - .stream() - .map(this::getSqlAndSqlParamsMap) - .collect(Collectors.toList()); + List mainStatementSqlBinds = + SqlSplitter.split(sqlParameters.getSql(), sqlParameters.getSegmentSeparator()) + .stream() + .map(this::getSqlAndSqlParamsMap) + .collect(Collectors.toList()); List preStatementSqlBinds = Optional.ofNullable(sqlParameters.getPreStatements()) .orElse(new ArrayList<>()) @@ -190,7 +192,7 @@ public class SqlTask extends AbstractTask { try { // create connection - connection = DataSourceClientProvider.getInstance().getConnection(DbType.valueOf(sqlParameters.getType()), baseConnectionParam); + connection = DataSourceUtils.getConnection(DbType.valueOf(sqlParameters.getType()), baseConnectionParam); // create temp function if (CollectionUtils.isNotEmpty(createFuncs)) { createTempFunction(connection, createFuncs); @@ -210,7 +212,7 @@ public class SqlTask extends AbstractTask { String updateResult = executeUpdate(connection, mainStatementsBinds, "main"); result = setNonQuerySqlReturn(updateResult, sqlParameters.getLocalParams()); } - //deal out params + // deal out params sqlParameters.dealOutParam(result); // post execute @@ -265,7 +267,8 @@ public class SqlTask extends AbstractTask { resultJSONArray.add(mapOfColValues); rowCount++; } - int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS; + int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() + : TaskConstants.DEFAULT_DISPLAY_ROWS; displayRows = Math.min(displayRows, rowCount); logger.info("display sql result {} rows as follows:", displayRows); for (int i = 0; i < displayRows; i++) { @@ -305,12 +308,14 @@ public class SqlTask extends AbstractTask { } } - private String executeUpdate(Connection connection, List statementsBinds, String handlerType) throws Exception { + private String executeUpdate(Connection connection, List statementsBinds, + String handlerType) throws Exception { int result = 0; for (SqlBinds sqlBind : statementsBinds) { try (PreparedStatement statement = prepareStatementAndBind(connection, sqlBind)) { result = statement.executeUpdate(); - logger.info("{} statement execute update result: {}, for sql: {}", handlerType, result, sqlBind.getSql()); + logger.info("{} statement execute update result: {}, for sql: {}", handlerType, result, + sqlBind.getSql()); } } return String.valueOf(result); @@ -371,7 +376,8 @@ public class SqlTask extends AbstractTask { ParameterUtils.setInParameter(entry.getKey(), stmt, prop.getType(), prop.getValue()); } } - logger.info("prepare statement replace sql : {}, sql parameters : {}", sqlBinds.getSql(), sqlBinds.getParamsMap()); + logger.info("prepare statement replace sql : {}, sql parameters : {}", sqlBinds.getSql(), + sqlBinds.getParamsMap()); return stmt; } catch (Exception exception) { throw new TaskException("SQL task prepareStatementAndBind error", exception); @@ -387,14 +393,15 @@ public class SqlTask extends AbstractTask { * @param sqlParamsMap sql params map */ private void printReplacedSql(String content, String formatSql, String rgex, Map sqlParamsMap) { - //parameter print style + // parameter print style logger.info("after replace sql , preparing : {}", formatSql); StringBuilder logPrint = new StringBuilder("replaced sql , parameters:"); if (sqlParamsMap == null) { logger.info("printReplacedSql: sqlParamsMap is null."); } else { for (int i = 1; i <= sqlParamsMap.size(); i++) { - logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType()).append(")"); + logPrint.append(sqlParamsMap.get(i).getValue()).append("(").append(sqlParamsMap.get(i).getType()) + .append(")"); } } logger.info("Sql Params are {}", logPrint); @@ -428,8 +435,8 @@ public class SqlTask extends AbstractTask { } // special characters need to be escaped, ${} needs to be escaped - setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap,taskExecutionContext.getTaskInstanceId()); - //Replace the original value in sql !{...} ,Does not participate in precompilation + setSqlParamsMap(sql, rgex, sqlParamsMap, paramsMap, taskExecutionContext.getTaskInstanceId()); + // Replace the original value in sql !{...} ,Does not participate in precompilation String rgexo = "['\"]*\\!\\{(.*?)\\}['\"]*"; sql = replaceOriginalValue(sql, rgexo, paramsMap); // replace the ${} of the SQL statement with the Placeholder @@ -485,7 +492,8 @@ public class SqlTask extends AbstractTask { */ private List buildTempFuncSql(List udfFuncParameters) { return udfFuncParameters.stream().map(value -> MessageFormat - .format(CREATE_OR_REPLACE_FUNCTION_FORMAT, value.getFuncName(), value.getClassName())).collect(Collectors.toList()); + .format(CREATE_OR_REPLACE_FUNCTION_FORMAT, value.getFuncName(), value.getClassName())) + .collect(Collectors.toList()); } /** @@ -499,7 +507,8 @@ public class SqlTask extends AbstractTask { String prefixPath = defaultFS.startsWith("file://") ? "file://" : defaultFS; String uploadPath = CommonUtils.getHdfsUdfDir(value.getTenantCode()); String resourceFullName = value.getResourceName(); - resourceFullName = resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s", resourceFullName); + resourceFullName = + resourceFullName.startsWith("/") ? resourceFullName : String.format("/%s", resourceFullName); return String.format("add jar %s%s%s", prefixPath, uploadPath, resourceFullName); }).collect(Collectors.toList()); }