Browse Source

Merge remote-tracking branch 'upstream/dev' into spilit

pull/3/MERGE
lenboo 4 years ago
parent
commit
cf63fc1fee
  1. 20
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 63
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
  3. 19
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
  4. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java
  5. 75
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java
  6. 47
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSourceTest.java
  7. 13
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
  8. 1
      pom.xml

20
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -22,7 +22,7 @@ import java.util.Locale;
import org.springframework.context.i18n.LocaleContextHolder; import org.springframework.context.i18n.LocaleContextHolder;
/** /**
* status enum * status enum // todo #4855 One category one interval
*/ */
public enum Status { public enum Status {
@ -194,12 +194,17 @@ public enum Status {
BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"), BATCH_MOVE_PROCESS_DEFINITION_ERROR(10160, "batch move process definition error", "移动工作流错误"),
QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"), QUERY_WORKFLOW_LINEAGE_ERROR(10161, "query workflow lineage error", "查询血缘失败"),
QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"), QUERY_AUTHORIZED_AND_USER_CREATED_PROJECT_ERROR(10162, "query authorized and user created project error error", "查询授权的和用户创建的项目错误"),
DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163,"delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"), DELETE_PROCESS_DEFINITION_BY_ID_FAIL(10163, "delete process definition by id fail, for there are {0} process instances in executing using it", "删除工作流定义失败,有[{0}]个运行中的工作流实例正在使用"),
CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"), CHECK_OS_TENANT_CODE_ERROR(10164, "Please enter the English os tenant code", "请输入英文操作系统租户"),
FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"), FORCE_TASK_SUCCESS_ERROR(10165, "force task success error", "强制成功任务实例错误"),
TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"), TASK_INSTANCE_STATE_OPERATION_ERROR(10166, "the status of task instance {0} is {1},Cannot perform force success operation", "任务实例[{0}]的状态是[{1}],无法执行强制成功操作"),
DATASOURCE_TYPE_NOT_EXIST(10167, "data source type not exist", "数据源类型不存在"), DATASOURCE_TYPE_NOT_EXIST(10167, "data source type not exist", "数据源类型不存在"),
PROCESS_DEFINITION_NAME_EXIST(10168, "process definition name {0} already exists", "工作流定义名称[{0}]已存在"), PROCESS_DEFINITION_NAME_EXIST(10168, "process definition name {0} already exists", "工作流定义名称[{0}]已存在"),
DATASOURCE_DB_TYPE_ILLEGAL(10169, "datasource type illegal", "数据源类型参数不合法"),
DATASOURCE_PORT_ILLEGAL(10170, "datasource port illegal", "数据源端口参数不合法"),
DATASOURCE_OTHER_PARAMS_ILLEGAL(10171, "datasource other params illegal", "数据源其他参数不合法"),
DATASOURCE_NAME_ILLEGAL(10172, "datasource name illegal", "数据源名称不合法"),
DATASOURCE_HOST_ILLEGAL(10173, "datasource host illegal", "数据源HOST不合法"),
UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"),
UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"),
@ -251,7 +256,7 @@ public enum Status {
BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"), BATCH_DELETE_PROCESS_DEFINE_BY_IDS_ERROR(50026, "batch delete process definition by ids {0} error", "批量删除工作流定义[{0}]错误"),
TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"), TENANT_NOT_SUITABLE(50027, "there is not any tenant suitable, please choose a tenant available.", "没有合适的租户,请选择可用的租户"),
EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"), EXPORT_PROCESS_DEFINE_BY_ID_ERROR(50028, "export process definition by id error", "导出工作流定义错误"),
BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028,"batch export process definition by ids error", "批量导出工作流定义错误"), BATCH_EXPORT_PROCESS_DEFINE_BY_IDS_ERROR(50028, "batch export process definition by ids error", "批量导出工作流定义错误"),
IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"), IMPORT_PROCESS_DEFINE_ERROR(50029, "import process definition error", "导入工作流定义错误"),
TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"), TASK_DEFINE_NOT_EXIST(50030, "task definition {0} does not exist", "任务定义[{0}]不存在"),
DELETE_TASK_DEFINE_BY_CODE_ERROR(50031, "delete task definition by code error", "删除任务定义错误"), DELETE_TASK_DEFINE_BY_CODE_ERROR(50031, "delete task definition by code error", "删除任务定义错误"),
@ -293,11 +298,10 @@ public enum Status {
GET_ALERT_PLUGIN_INSTANCE_ERROR(110007, "get alert plugin instance error", "获取告警组和告警组插件实例错误"), GET_ALERT_PLUGIN_INSTANCE_ERROR(110007, "get alert plugin instance error", "获取告警组和告警组插件实例错误"),
CREATE_ALERT_PLUGIN_INSTANCE_ERROR(110008, "create alert plugin instance error", "创建告警组和告警组插件实例错误"), CREATE_ALERT_PLUGIN_INSTANCE_ERROR(110008, "create alert plugin instance error", "创建告警组和告警组插件实例错误"),
QUERY_ALL_ALERT_PLUGIN_INSTANCE_ERROR(110009, "query all alert plugin instance error", "查询所有告警实例失败"), QUERY_ALL_ALERT_PLUGIN_INSTANCE_ERROR(110009, "query all alert plugin instance error", "查询所有告警实例失败"),
PLUGIN_INSTANCE_ALREADY_EXIT(110010,"plugin instance already exit","该告警插件实例已存在"), PLUGIN_INSTANCE_ALREADY_EXIT(110010, "plugin instance already exit", "该告警插件实例已存在"),
LIST_PAGING_ALERT_PLUGIN_INSTANCE_ERROR(110011,"query plugin instance page error","分页查询告警实例失败"), LIST_PAGING_ALERT_PLUGIN_INSTANCE_ERROR(110011, "query plugin instance page error", "分页查询告警实例失败"),
DELETE_ALERT_PLUGIN_INSTANCE_ERROR_HAS_ALERT_GROUP_ASSOCIATED(110012,"failed to delete the alert instance, there is an alarm group associated with this alert instance","删除告警实例失败,存在与此告警实例关联的警报组") DELETE_ALERT_PLUGIN_INSTANCE_ERROR_HAS_ALERT_GROUP_ASSOCIATED(110012, "failed to delete the alert instance, there is an alarm group associated with this alert instance",
"删除告警实例失败,存在与此告警实例关联的警报组");
;
private final int code; private final int code;
private final String enMsg; private final String enMsg;

63
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.api.service.impl; package org.apache.dolphinscheduler.api.service.impl;
import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.DataSourceService; import org.apache.dolphinscheduler.api.service.DataSourceService;
import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.api.utils.Result;
@ -29,12 +30,15 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.datasource.MySQLDataSource;
import org.apache.dolphinscheduler.dao.datasource.OracleDataSource; import org.apache.dolphinscheduler.dao.datasource.OracleDataSource;
import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper; import org.apache.dolphinscheduler.dao.mapper.DataSourceUserMapper;
import org.apache.commons.collections4.MapUtils;
import java.sql.Connection; import java.sql.Connection;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
@ -44,6 +48,7 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.regex.Pattern;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -73,6 +78,15 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
public static final String USER_NAME = "userName"; public static final String USER_NAME = "userName";
public static final String OTHER = "other"; public static final String OTHER = "other";
private static final Pattern IPV4_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.]+$");
private static final Pattern IPV6_PATTERN = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.\\:\\[\\]]+$");
private static final Pattern DATABASE_PATTER = Pattern.compile("^[a-zA-Z0-9\\_\\-\\.]+$");
private static final Pattern PARAMS_PATTER = Pattern.compile("^[a-zA-Z0-9]+$");
@Autowired @Autowired
private DataSourceMapper dataSourceMapper; private DataSourceMapper dataSourceMapper;
@ -315,8 +329,6 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
/** /**
* handle datasource connection password for safety * handle datasource connection password for safety
*
* @param dataSourceList
*/ */
private void handlePasswd(List<DataSource> dataSourceList) { private void handlePasswd(List<DataSource> dataSourceList) {
for (DataSource dataSource : dataSourceList) { for (DataSource dataSource : dataSourceList) {
@ -404,7 +416,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
return result; return result;
} catch (Exception e) { } catch (Exception e) {
logger.error("datasource test connection error, dbType:{}, jdbcUrl:{}, message:{}.", type, datasource.getJdbcUrl(), e.getMessage()); logger.error("datasource test connection error, dbType:{}, jdbcUrl:{}, message:{}.", type, datasource.getJdbcUrl(), e.getMessage());
return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(),e.getMessage()); return new Result<>(Status.CONNECTION_TEST_FAILURE.getCode(), e.getMessage());
} }
} }
@ -443,7 +455,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
String port, String database, String principal, String userName, String port, String database, String principal, String userName,
String password, DbConnectType connectType, String other, String password, DbConnectType connectType, String other,
String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) { String javaSecurityKrb5Conf, String loginUserKeytabUsername, String loginUserKeytabPath) {
checkParams(type, port, host, database, other);
String address = buildAddress(type, host, port, connectType); String address = buildAddress(type, host, port, connectType);
Map<String, Object> parameterMap = new LinkedHashMap<>(); Map<String, Object> parameterMap = new LinkedHashMap<>();
String jdbcUrl; String jdbcUrl;
@ -491,9 +503,13 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
} }
Map<String, String> map = JSONUtils.toMap(other); Map<String, String> map = JSONUtils.toMap(other);
if (map != null) { if (type == DbType.MYSQL) {
map = MySQLDataSource.buildOtherParams(other);
}
if (MapUtils.isNotEmpty(map)) {
StringBuilder otherSb = new StringBuilder(); StringBuilder otherSb = new StringBuilder();
for (Map.Entry<String, String> entry: map.entrySet()) { for (Map.Entry<String, String> entry : map.entrySet()) {
otherSb.append(String.format("%s=%s%s", entry.getKey(), entry.getValue(), separator)); otherSb.append(String.format("%s=%s%s", entry.getKey(), entry.getValue(), separator));
} }
if (!Constants.DB2.equals(type.name())) { if (!Constants.DB2.equals(type.name())) {
@ -668,4 +684,39 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
result[1] = port; result[1] = port;
return result; return result;
} }
private void checkParams(DbType type, String port, String host, String database, String other) {
if (null == DbType.of(type.getCode())) {
throw new ServiceException(Status.DATASOURCE_DB_TYPE_ILLEGAL);
}
if (!isNumeric(port)) {
throw new ServiceException(Status.DATASOURCE_PORT_ILLEGAL);
}
if (!IPV4_PATTERN.matcher(host).matches() || !IPV6_PATTERN.matcher(host).matches()) {
throw new ServiceException(Status.DATASOURCE_HOST_ILLEGAL);
}
if (!DATABASE_PATTER.matcher(database).matches()) {
throw new ServiceException(Status.DATASOURCE_NAME_ILLEGAL);
}
if (StringUtils.isBlank(other)) {
return;
}
Map<String, String> map = JSONUtils.toMap(other);
if (MapUtils.isEmpty(map)) {
return;
}
boolean paramsCheck = map.entrySet().stream().allMatch(p -> PARAMS_PATTER.matcher(p.getValue()).matches());
if (!paramsCheck) {
throw new ServiceException(Status.DATASOURCE_OTHER_PARAMS_ILLEGAL);
}
}
private static boolean isNumeric(String str) {
for (int i = str.length(); --i >= 0; ) {
if (!Character.isDigit(str.charAt(i))) {
return false;
}
}
return true;
}
} }

19
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java

@ -87,7 +87,7 @@ public class DataSourceServiceTest {
// data source exits // data source exits
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
Result connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(),Status.DATASOURCE_CONNECT_FAILED.getMsg()); Result connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg());
//PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult); //PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType, parameter); PowerMockito.doReturn(connectionResult).when(dataSourceService).checkConnection(dataSourceType, parameter);
Result connectFailedResult = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result connectFailedResult = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
@ -95,7 +95,7 @@ public class DataSourceServiceTest {
// data source exits // data source exits
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName.trim())).thenReturn(null);
connectionResult = new Result(Status.SUCCESS.getCode(),Status.SUCCESS.getMsg()); connectionResult = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult); PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
PowerMockito.when(DataSourceFactory.getDatasource(dataSourceType, parameter)).thenReturn(null); PowerMockito.when(DataSourceFactory.getDatasource(dataSourceType, parameter)).thenReturn(null);
Result notValidError = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result notValidError = dataSourceService.createDataSource(loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
@ -141,7 +141,7 @@ public class DataSourceServiceTest {
// data source connect failed // data source connect failed
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
Result connectionResult = new Result(Status.SUCCESS.getCode(),Status.SUCCESS.getMsg()); Result connectionResult = new Result(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg());
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult); PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
Result connectFailed = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result connectFailed = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), connectFailed.getCode().intValue()); Assert.assertEquals(Status.DATASOURCE_CONNECT_FAILED.getCode(), connectFailed.getCode().intValue());
@ -149,7 +149,7 @@ public class DataSourceServiceTest {
//success //success
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(dataSource);
PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null); PowerMockito.when(dataSourceMapper.queryDataSourceByName(dataSourceName)).thenReturn(null);
connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(),Status.DATASOURCE_CONNECT_FAILED.getMsg()); connectionResult = new Result(Status.DATASOURCE_CONNECT_FAILED.getCode(), Status.DATASOURCE_CONNECT_FAILED.getMsg());
PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult); PowerMockito.when(dataSourceService.checkConnection(dataSourceType, parameter)).thenReturn(connectionResult);
Result success = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter); Result success = dataSourceService.updateDataSource(dataSourceId, loginUser, dataSourceName, dataSourceDesc, dataSourceType, parameter);
Assert.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue()); Assert.assertEquals(Status.SUCCESS.getCode(), success.getCode().intValue());
@ -171,7 +171,7 @@ public class DataSourceServiceTest {
int dataSourceId = -1; int dataSourceId = -1;
PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null); PowerMockito.when(dataSourceMapper.selectById(dataSourceId)).thenReturn(null);
Result result = dataSourceService.connectionTest(dataSourceId); Result result = dataSourceService.connectionTest(dataSourceId);
Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getCode(),result.getCode().intValue()); Assert.assertEquals(Status.RESOURCE_NOT_EXIST.getCode(), result.getCode().intValue());
} }
@Test @Test
@ -280,7 +280,7 @@ public class DataSourceServiceTest {
@Test @Test
public void buildParameter() { public void buildParameter() {
String param = dataSourceService.buildParameter(DbType.ORACLE, "192.168.9.1", "1521", "im" String param = dataSourceService.buildParameter(DbType.ORACLE, "192.168.9.1", "1521", "im"
, "", "test", "test", DbConnectType.ORACLE_SERVICE_NAME, "", "", "",""); , "", "test", "test", DbConnectType.ORACLE_SERVICE_NAME, "", "", "", "");
String expected = "{\"connectType\":\"ORACLE_SERVICE_NAME\",\"type\":\"ORACLE_SERVICE_NAME\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\"," String expected = "{\"connectType\":\"ORACLE_SERVICE_NAME\",\"type\":\"ORACLE_SERVICE_NAME\",\"address\":\"jdbc:oracle:thin:@//192.168.9.1:1521\",\"database\":\"im\","
+ "\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\",\"user\":\"test\",\"password\":\"test\"}"; + "\"jdbcUrl\":\"jdbc:oracle:thin:@//192.168.9.1:1521/im\",\"user\":\"test\",\"password\":\"test\"}";
Assert.assertEquals(expected, param); Assert.assertEquals(expected, param);
@ -300,8 +300,9 @@ public class DataSourceServiceTest {
@Test @Test
public void buildParameterWithDecodePassword() { public void buildParameterWithDecodePassword() {
PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "true"); PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "true");
String other = "{\"autoDeserialize\":\"yes\",\"allowUrlInLocalInfile\":\"true\"}";
String param = dataSourceService.buildParameter(DbType.MYSQL, "192.168.9.1", "1521", "im" String param = dataSourceService.buildParameter(DbType.MYSQL, "192.168.9.1", "1521", "im"
, "", "test", "123456", null, "", "", "", ""); , "", "test", "123456", null, other, "", "", "");
String expected = "{\"type\":null,\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/im\"," String expected = "{\"type\":null,\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/im\","
+ "\"user\":\"test\",\"password\":\"IUAjJCVeJipNVEl6TkRVMg==\"}"; + "\"user\":\"test\",\"password\":\"IUAjJCVeJipNVEl6TkRVMg==\"}";
Assert.assertEquals(expected, param); Assert.assertEquals(expected, param);
@ -309,7 +310,8 @@ public class DataSourceServiceTest {
PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "false"); PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "false");
param = dataSourceService.buildParameter(DbType.MYSQL, "192.168.9.1", "1521", "im" param = dataSourceService.buildParameter(DbType.MYSQL, "192.168.9.1", "1521", "im"
, "", "test", "123456", null, "", "", "", ""); , "", "test", "123456", null, "", "", "", "");
expected = "{\"type\":null,\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/im\",\"user\":\"test\",\"password\":\"123456\"}"; expected = "{\"type\":null,\"address\":\"jdbc:mysql://192.168.9.1:1521\",\"database\":\"im\",\"jdbcUrl\":\"jdbc:mysql://192.168.9.1:1521/im\","
+ "\"user\":\"test\",\"password\":\"123456\"}";
Assert.assertEquals(expected, param); Assert.assertEquals(expected, param);
} }
@ -328,7 +330,6 @@ public class DataSourceServiceTest {
/** /**
* test check connection * test check connection
* @throws Exception
*/ */
@Test @Test
public void testCheckConnection() throws Exception { public void testCheckConnection() throws Exception {

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/DbType.java

@ -75,6 +75,6 @@ public enum DbType {
if(DB_TYPE_MAP.containsKey(type)){ if(DB_TYPE_MAP.containsKey(type)){
return DB_TYPE_MAP.get(type); return DB_TYPE_MAP.get(type);
} }
throw new IllegalArgumentException("invalid type : " + type); return null;
} }
} }

75
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSource.java

@ -14,11 +14,19 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.datasource; package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.commons.collections4.MapUtils;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -27,14 +35,25 @@ import org.slf4j.LoggerFactory;
*/ */
public class MySQLDataSource extends BaseDataSource { public class MySQLDataSource extends BaseDataSource {
private final Logger logger = LoggerFactory.getLogger(MySQLDataSource.class); private static final Logger logger = LoggerFactory.getLogger(MySQLDataSource.class);
private static final String ALLOW_LOAD_LOCAL_IN_FILE_NAME = "allowLoadLocalInfile";
private static final String AUTO_DESERIALIZE = "autoDeserialize";
private static final String ALLOW_LOCAL_IN_FILE_NAME = "allowLocalInfile";
private static final String ALLOW_URL_IN_LOCAL_IN_FILE_NAME = "allowUrlInLocalInfile";
private final String sensitiveParam = "autoDeserialize=true"; private static final String APPEND_PARAMS = "allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false";
private final char symbol = '&'; private static boolean checkKeyIsLegitimate(String key) {
return !key.contains(ALLOW_LOAD_LOCAL_IN_FILE_NAME) && !key.contains(AUTO_DESERIALIZE) && !key.contains(ALLOW_LOCAL_IN_FILE_NAME) && !key.contains(ALLOW_URL_IN_LOCAL_IN_FILE_NAME);
}
/** /**
* gets the JDBC url for the data source connection * gets the JDBC url for the data source connection
*
* @return jdbc url * @return jdbc url
*/ */
@Override @Override
@ -50,43 +69,51 @@ public class MySQLDataSource extends BaseDataSource {
return DbType.MYSQL; return DbType.MYSQL;
} }
@Override public static Map<String, String> buildOtherParams(String other) {
protected String filterOther(String other){ if (StringUtils.isBlank(other)) {
if(StringUtils.isBlank(other)){ return null;
return "";
} }
if(other.contains(sensitiveParam)){ Map<String, String> paramMap = JSONUtils.toMap(other);
int index = other.indexOf(sensitiveParam); if (MapUtils.isEmpty(paramMap)) {
String tmp = sensitiveParam; return null;
if(index == 0 || other.charAt(index + 1) == symbol){
tmp = tmp + symbol;
} else if(other.charAt(index - 1) == symbol){
tmp = symbol + tmp;
} }
logger.warn("sensitive param : {} in otherParams field is filtered", tmp); Map<String, String> newParamMap = new HashMap<>();
other = other.replace(tmp, ""); paramMap.forEach((k, v) -> {
if (!checkKeyIsLegitimate(k)) {
return;
} }
logger.debug("other : {}", other); newParamMap.put(k, v);
return other;
});
return newParamMap;
} }
@Override @Override
public String getUser() { public String getUser() {
if(user.contains(sensitiveParam)){ if (user.contains(AUTO_DESERIALIZE)) {
logger.warn("sensitive param : {} in username field is filtered", sensitiveParam); logger.warn("sensitive param : {} in username field is filtered", AUTO_DESERIALIZE);
user = user.replace(sensitiveParam, ""); user = user.replace(AUTO_DESERIALIZE, "");
} }
logger.debug("username : {}", user); logger.debug("username : {}", user);
return user; return user;
} }
@Override
protected String filterOther(String otherParams) {
if (StringUtils.isBlank(otherParams)) {
return APPEND_PARAMS;
}
char symbol = '&';
return otherParams + symbol + APPEND_PARAMS;
}
@Override @Override
public String getPassword() { public String getPassword() {
// password need decode // password need decode
password = super.getPassword(); password = super.getPassword();
if(password.contains(sensitiveParam)){ if (password.contains(AUTO_DESERIALIZE)) {
logger.warn("sensitive param : {} in password field is filtered", sensitiveParam); logger.warn("sensitive param : {} in password field is filtered", AUTO_DESERIALIZE);
password = password.replace(sensitiveParam, ""); password = password.replace(AUTO_DESERIALIZE, "");
} }
return password; return password;
} }

47
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/datasource/MySQLDataSourceTest.java

@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.dao.datasource; package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -27,57 +29,58 @@ import org.junit.Test;
public class MySQLDataSourceTest { public class MySQLDataSourceTest {
@Test @Test
public void testGetUser(){ public void testGetUser() {
MySQLDataSource dataSource = new MySQLDataSource(); MySQLDataSource dataSource = new MySQLDataSource();
String safeUsername= "test123"; String safeUsername = "test123";
dataSource.setUser(safeUsername); dataSource.setUser(safeUsername);
Assert.assertEquals("test123", dataSource.getUser()); Assert.assertEquals("test123", dataSource.getUser());
String sensitiveUsername= "test123?autoDeserialize=true"; String sensitiveUsername = "test123?autoDeserialize=true";
dataSource.setUser(sensitiveUsername); dataSource.setUser(sensitiveUsername);
Assert.assertEquals("test123?", dataSource.getUser()); Assert.assertEquals("test123?=true", dataSource.getUser());
} }
@Test @Test
public void testGetPassword(){ public void testGetPassword() {
MySQLDataSource dataSource = new MySQLDataSource(); MySQLDataSource dataSource = new MySQLDataSource();
String safePwd= "test_pwd"; String safePwd = "test_pwd";
dataSource.setPassword(safePwd); dataSource.setPassword(safePwd);
Assert.assertEquals("test_pwd", dataSource.getPassword()); Assert.assertEquals("test_pwd", dataSource.getPassword());
String sensitivePwd= "test_pwd?autoDeserialize=true"; String sensitivePwd = "test_pwd?autoDeserialize=true";
dataSource.setPassword(sensitivePwd); dataSource.setPassword(sensitivePwd);
Assert.assertEquals("test_pwd?", dataSource.getPassword()); Assert.assertEquals("test_pwd?=true", dataSource.getPassword());
} }
@Test @Test
public void testFilterOther(){ public void testFilterOther() {
MySQLDataSource dataSource = new MySQLDataSource(); MySQLDataSource dataSource = new MySQLDataSource();
String other = dataSource.filterOther("serverTimezone=Asia/Shanghai&characterEncoding=utf8"); String other = dataSource.filterOther("serverTimezone=Asia/Shanghai&characterEncoding=utf8");
Assert.assertEquals("serverTimezone=Asia/Shanghai&characterEncoding=utf8", other); Assert.assertEquals("serverTimezone=Asia/Shanghai&characterEncoding=utf8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false", other);
//at the first //at the first
other = dataSource.filterOther("autoDeserialize=true&serverTimezone=Asia/Shanghai&characterEncoding=utf8"); other = dataSource.filterOther("serverTimezone=Asia/Shanghai&characterEncoding=utf8");
Assert.assertEquals("serverTimezone=Asia/Shanghai&characterEncoding=utf8", other); Assert.assertEquals("serverTimezone=Asia/Shanghai&characterEncoding=utf8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false", other);
//at the end //at the end
other = dataSource.filterOther("serverTimezone=Asia/Shanghai&characterEncoding=utf8&autoDeserialize=true"); other = dataSource.filterOther("serverTimezone=Asia/Shanghai&characterEncoding=utf8");
Assert.assertEquals("serverTimezone=Asia/Shanghai&characterEncoding=utf8", other); Assert.assertEquals("serverTimezone=Asia/Shanghai&characterEncoding=utf8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false", other);
//in the middle //in the middle
other = dataSource.filterOther("serverTimezone=Asia/Shanghai&autoDeserialize=true&characterEncoding=utf8"); other = dataSource.filterOther("serverTimezone=Asia/Shanghai&characterEncoding=utf8");
Assert.assertEquals("serverTimezone=Asia/Shanghai&characterEncoding=utf8", other); Assert.assertEquals("serverTimezone=Asia/Shanghai&characterEncoding=utf8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false", other);
other = dataSource.filterOther(null);
Assert.assertEquals("allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false", other);
} }
@Test @Test
public void testGetPasswordWithDecodePassword(){ public void testGetPasswordWithDecodePassword() {
MySQLDataSource dataSource = new MySQLDataSource(); MySQLDataSource dataSource = new MySQLDataSource();
String password= ""; String password = "";
dataSource.setPassword(password); dataSource.setPassword(password);
Assert.assertEquals("", dataSource.getPassword()); Assert.assertEquals("", dataSource.getPassword());
password= "IUAjJCVeJipNVEl6TkRVMg=="; password = "IUAjJCVeJipNVEl6TkRVMg==";
dataSource.setPassword(password); dataSource.setPassword(password);
Assert.assertNotNull(dataSource.getPassword()); Assert.assertNotNull(dataSource.getPassword());
Assert.assertNotNull(dataSource.getPassword()); Assert.assertNotNull(dataSource.getPassword());
dataSource.setPassword(password); dataSource.setPassword(password);
PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE,"true"); PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "true");
Assert.assertEquals("123456", dataSource.getPassword()); Assert.assertEquals("123456", dataSource.getPassword());
dataSource.setPassword(password); dataSource.setPassword(password);
@ -86,7 +89,7 @@ public class MySQLDataSourceTest {
Assert.assertEquals("123456", dataSource.getPassword()); Assert.assertEquals("123456", dataSource.getPassword());
dataSource.setPassword(password); dataSource.setPassword(password);
PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE,"false"); PropertyUtils.setValue(Constants.DATASOURCE_ENCRYPTION_ENABLE, "false");
Assert.assertEquals("IUAjJCVeJipNVEl6TkRVMg==", dataSource.getPassword()); Assert.assertEquals("IUAjJCVeJipNVEl6TkRVMg==", dataSource.getPassword());
dataSource.setPassword(password); dataSource.setPassword(password);

13
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java

@ -97,7 +97,8 @@ public class SqoopTaskTest {
SqoopJobGenerator generator = new SqoopJobGenerator(); SqoopJobGenerator generator = new SqoopJobGenerator();
String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams, mysqlTaskExecutionContext); String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams, mysqlTaskExecutionContext);
String mysqlToHdfsExpected = String mysqlToHdfsExpected =
"sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" " "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect "
+ "\"jdbc:mysql://192.168.0.111:3306/test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\" "
+ "--username kylo --password \"123456\" --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile " + "--username kylo --password \"123456\" --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile "
+ "--delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; + "--delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
Assert.assertEquals(mysqlToHdfsExpected, mysqlToHdfsScript); Assert.assertEquals(mysqlToHdfsExpected, mysqlToHdfsScript);
@ -111,7 +112,8 @@ public class SqoopTaskTest {
SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql, SqoopParameters.class); SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql, SqoopParameters.class);
String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams, mysqlTaskExecutionContext); String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams, mysqlTaskExecutionContext);
String hdfsToMysqlScriptExpected = String hdfsToMysqlScriptExpected =
"sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect \"jdbc:mysql://192.168.0.111:3306/test\" " "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect "
+ "\"jdbc:mysql://192.168.0.111:3306/test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\" "
+ "--username kylo --password \"123456\" --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' " + "--username kylo --password \"123456\" --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' "
+ "--lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; + "--lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript); Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript);
@ -128,7 +130,8 @@ public class SqoopTaskTest {
String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams, mysqlTaskExecutionContext); String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams, mysqlTaskExecutionContext);
String hiveToMysqlExpected = String hiveToMysqlExpected =
"sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date " "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date "
+ "--hcatalog-partition-values 2020-02-17 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" --table person_3 " + "--hcatalog-partition-values 2020-02-17 --connect \"jdbc:mysql://192.168.0.111:3306/test?allowLoadLocalInfile="
+ "false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\" --username kylo --password \"123456\" --table person_3 "
+ "--fields-terminated-by '@' --lines-terminated-by '\\n'"; + "--fields-terminated-by '@' --lines-terminated-by '\\n'";
Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript); Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript);
@ -143,7 +146,9 @@ public class SqoopTaskTest {
SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class); SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class);
String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext); String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext);
String mysqlToHiveExpected = String mysqlToHiveExpected =
"sqoop import -D mapred.job.name=sqoop_import -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" " "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/"
+ "test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\" "
+ "--username kylo --password \"123456\" "
+ "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-database stg --hive-table person_internal_2 " + "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-database stg --hive-table person_internal_2 "
+ "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; + "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript); Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript);

1
pom.xml

@ -894,6 +894,7 @@
<include>**/dao/mapper/CommandMapperTest.java</include> <include>**/dao/mapper/CommandMapperTest.java</include>
<include>**/dao/mapper/ConnectionFactoryTest.java</include> <include>**/dao/mapper/ConnectionFactoryTest.java</include>
<include>**/dao/mapper/DataSourceMapperTest.java</include> <include>**/dao/mapper/DataSourceMapperTest.java</include>
<include>**/dao/datasource/MySQLDataSourceTest.java</include>
<include>**/dao/entity/TaskInstanceTest.java</include> <include>**/dao/entity/TaskInstanceTest.java</include>
<include>**/dao/entity/UdfFuncTest.java</include> <include>**/dao/entity/UdfFuncTest.java</include>
<include>**/remote/JsonSerializerTest.java</include> <include>**/remote/JsonSerializerTest.java</include>

Loading…
Cancel
Save