diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java index a68db21752..c4ea3d9271 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java @@ -84,7 +84,7 @@ public class DataSourceController extends BaseController { * * @param loginUser login user * @param jsonStr datasource param - * example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2} + * example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2,"testFlag":0,"bindTestId":1} * @return create result code */ @ApiOperation(value = "createDataSource", notes = "CREATE_DATA_SOURCE_NOTES") @@ -104,7 +104,7 @@ public class DataSourceController extends BaseController { * @param loginUser login user * @param id datasource id * @param jsonStr datasource param - * example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2} + * example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2,"testFlag":0,"bindTestId":1} * @return update result code */ @ApiOperation(value = "updateDataSource", notes = "UPDATE_DATA_SOURCE_NOTES") @@ -148,7 +148,7 @@ public class DataSourceController extends BaseController { } /** - * query datasource by type + * query online/testDatasource by type * * @param loginUser login user * @param type data source type @@ -156,15 +156,17 @@ public class DataSourceController extends BaseController { */ @ApiOperation(value = "queryDataSourceList", notes = "QUERY_DATA_SOURCE_LIST_BY_TYPE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "type", value = "DB_TYPE", required = true, dataType = "DbType") + @ApiImplicitParam(name = "type", value = "DB_TYPE", required = true, dataType = "DbType"), + @ApiImplicitParam(name = "testFlag", value = "DB_TEST_FLAG", required = true, dataType = "DbTestFlag") }) @GetMapping(value = "/list") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_DATASOURCE_ERROR) @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result queryDataSourceList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam("type") DbType type) { - Map result = dataSourceService.queryDataSourceList(loginUser, type.ordinal()); + @RequestParam("type") DbType type, + @RequestParam("testFlag") int testFlag) { + Map result = dataSourceService.queryDataSourceList(loginUser, type.ordinal(), testFlag); return returnDataList(result); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java index 177a8cabb5..c120fb0fc1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java @@ -103,26 +103,28 @@ public class ExecutorController extends BaseController { * @param workerGroup worker group * @param timeout timeout * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode + * @param testFlag testFlag * @return start process result code */ @ApiOperation(value = "startProcessInstance", notes = "RUN_PROCESS_INSTANCE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataTypeClass = long.class, example = "100"), - @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataTypeClass = String.class, example = "2022-04-06 00:00:00,2022-04-06 00:00:00"), - @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataTypeClass = FailureStrategy.class), - @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataTypeClass = String.class), - @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataTypeClass = TaskDependType.class), - @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataTypeClass = CommandType.class), - @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataTypeClass = WarningType.class), - @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", dataTypeClass = int.class, example = "100"), - @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataTypeClass = RunMode.class), - @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataTypeClass = Priority.class), - @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataTypeClass = String.class, example = "default"), - @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataTypeClass = long.class, example = "-1"), - @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataTypeClass = int.class, example = "100"), - @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataTypeClass = int.class, example = "8"), - @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataTypeClass = int.class, example = "0"), - @ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataTypeClass = ComplementDependentMode.class) + @ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100"), + @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String", example = "2022-04-06 00:00:00,2022-04-06 00:00:00"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"), + @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"), + @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"), + @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"), + @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), + @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"), + @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int" , example = "8"), + @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"), + @ApiImplicitParam(name = "testFlag", value = "TEST_FLAG", dataType = "Int", example = "0"), + @ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode") }) @PostMapping(value = "start-process-instance") @ResponseStatus(HttpStatus.OK) @@ -146,6 +148,7 @@ public class ExecutorController extends BaseController { @RequestParam(value = "startParams", required = false) String startParams, @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber, @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun, + @RequestParam(value = "testFlag", defaultValue = "0") int testFlag, @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) { if (timeout == null) { @@ -163,8 +166,7 @@ public class ExecutorController extends BaseController { Map result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, - workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, - complementDependentMode); + workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag, complementDependentMode); return returnDataList(result); } @@ -188,26 +190,28 @@ public class ExecutorController extends BaseController { * @param workerGroup worker group * @param timeout timeout * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode + * @param testFlag testFlag * @return start process result code */ @ApiOperation(value = "batchStartProcessInstance", notes = "BATCH_RUN_PROCESS_INSTANCE_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataTypeClass = String.class, example = "1,2,3"), - @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataTypeClass = String.class, example = "2022-04-06 00:00:00,2022-04-06 00:00:00"), - @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataTypeClass = FailureStrategy.class), - @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataTypeClass = String.class), - @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataTypeClass = TaskDependType.class), - @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataTypeClass = CommandType.class), - @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataTypeClass = WarningType.class), - @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataTypeClass = int.class, example = "100"), - @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataTypeClass = RunMode.class), - @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataTypeClass = Priority.class), - @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataTypeClass = String.class, example = "default"), - @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataTypeClass = long.class, example = "-1"), - @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataTypeClass = int.class, example = "100"), - @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataTypeClass = int.class, example = "8"), - @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataTypeClass = int.class, example = "0"), - @ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataTypeClass = ComplementDependentMode.class) + @ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "1,2,3"), + @ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String", example = "2022-04-06 00:00:00,2022-04-06 00:00:00"), + @ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"), + @ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"), + @ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"), + @ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"), + @ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"), + @ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"), + @ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"), + @ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"), + @ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"), + @ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"), + @ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"), + @ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"), + @ApiImplicitParam(name = "testFlag", value = "TEST_FLAG", dataType = "Int", example = "0"), + @ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode") }) @PostMapping(value = "batch-start-process-instance") @ResponseStatus(HttpStatus.OK) @@ -231,6 +235,7 @@ public class ExecutorController extends BaseController { @RequestParam(value = "startParams", required = false) String startParams, @RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber, @RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun, + @RequestParam(value = "testFlag", defaultValue = "0") int testFlag, @RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) { if (timeout == null) { @@ -259,7 +264,7 @@ public class ExecutorController extends BaseController { result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime, execType, failureStrategy, startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority, - workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, + workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag, complementDependentMode); if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index 1b2102f6b0..24258705a7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -89,6 +89,7 @@ public class PythonGateway { private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST; private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL; private static final int DEFAULT_DRY_RUN = 0; + private static final int DEFAULT_TEST_FLAG = 0; private static final ComplementDependentMode COMPLEMENT_DEPENDENT_MODE = ComplementDependentMode.OFF_MODE; // We use admin user's user_id to skip some permission issue from python gateway service private static final int ADMIN_USER_ID = 1; @@ -255,7 +256,7 @@ public class PythonGateway { processDefinitionCode = processDefinition.getCode(); } - // Fresh process definition schedule + // Fresh process definition schedule if (schedule != null) { createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId); } @@ -357,6 +358,7 @@ public class PythonGateway { null, null, DEFAULT_DRY_RUN, + DEFAULT_TEST_FLAG, COMPLEMENT_DEPENDENT_MODE ); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java index eef169b64e..eaf1552caa 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java @@ -69,13 +69,13 @@ public interface DataSourceService { Result queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize); /** - * query data resource list + * query online/test data resource list * * @param loginUser login user * @param type data source type * @return data source list page */ - Map queryDataSourceList(User loginUser, Integer type); + Map queryDataSourceList(User loginUser, Integer type, int testFlag); /** * verify datasource exists diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java index 8146f96e1c..d63c9a6cb7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java @@ -65,7 +65,7 @@ public interface ExecutorService { RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, Map startParams, Integer expectedParallelismNumber, - int dryRun, + int dryRun, int testFlag, ComplementDependentMode complementDependentMode); /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java index 98d876b4e7..808f85311d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java @@ -141,6 +141,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource dataSource.setConnectionParams(JSONUtils.toJsonString(connectionParam)); dataSource.setCreateTime(now); dataSource.setUpdateTime(now); + dataSource.setTestFlag(datasourceParam.getTestFlag()); + dataSource.setBindTestId(datasourceParam.getBindTestId()); try { dataSourceMapper.insert(dataSource); putMsg(result, Status.SUCCESS); @@ -211,6 +213,11 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource dataSource.setType(dataSource.getType()); dataSource.setConnectionParams(JSONUtils.toJsonString(connectionParam)); dataSource.setUpdateTime(now); + if(dataSource.getTestFlag() == 1 && dataSourceParam.getTestFlag() == 0){ + clearBindTestId(id); + } + dataSource.setTestFlag(dataSourceParam.getTestFlag()); + dataSource.setBindTestId(dataSourceParam.getBindTestId()); try { dataSourceMapper.updateById(dataSource); logger.info("Update datasource complete, datasourceId:{}, datasourceName:{}.", dataSource.getId(), dataSource.getName()); @@ -249,7 +256,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource baseDataSourceParamDTO.setId(dataSource.getId()); baseDataSourceParamDTO.setName(dataSource.getName()); baseDataSourceParamDTO.setNote(dataSource.getNote()); - + baseDataSourceParamDTO.setTestFlag(dataSource.getTestFlag()); + baseDataSourceParamDTO.setBindTestId(dataSource.getBindTestId()); result.put(Constants.DATA_LIST, baseDataSourceParamDTO); putMsg(result, Status.SUCCESS); return result; @@ -320,12 +328,12 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource * @return data source list page */ @Override - public Map queryDataSourceList(User loginUser, Integer type) { + public Map queryDataSourceList(User loginUser, Integer type, int testFlag) { Map result = new HashMap<>(); List datasourceList = null; if (loginUser.getUserType().equals(UserType.ADMIN_USER)) { - datasourceList = dataSourceMapper.queryDataSourceByType(0, type); + datasourceList = dataSourceMapper.queryDataSourceByType(0, type, testFlag); } else { Set ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger); if (ids.isEmpty()) { @@ -333,7 +341,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource putMsg(result, Status.SUCCESS); return result; } - datasourceList = dataSourceMapper.selectBatchIds(ids).stream().filter(dataSource -> dataSource.getType().getCode() == type).collect(Collectors.toList()); + datasourceList = dataSourceMapper.selectBatchIds(ids).stream().filter(dataSource -> dataSource.getType().getCode() == type).filter(dataSource -> dataSource.getTestFlag() == testFlag).collect(Collectors.toList()); + } result.put(Constants.DATA_LIST, datasourceList); putMsg(result, Status.SUCCESS); @@ -432,6 +441,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource } dataSourceMapper.deleteById(datasourceId); datasourceUserMapper.deleteByDatasourceId(datasourceId); + clearBindTestId(datasourceId); logger.info("Delete datasource complete, datasourceId:{}.", datasourceId); putMsg(result, Status.SUCCESS); } catch (Exception e) { @@ -679,5 +689,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource } } } + private void clearBindTestId(Integer bindTestId){ + dataSourceMapper.clearBindTestId(bindTestId); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 420f395382..7524a0c15f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -165,6 +165,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param timeout timeout * @param startParams the global param values which pass to new process instance * @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode + * @param testFlag testFlag * @return execute process instance code */ @Override @@ -176,7 +177,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout, Map startParams, Integer expectedParallelismNumber, - int dryRun, ComplementDependentMode complementDependentMode) { + int dryRun, int testFlag, ComplementDependentMode complementDependentMode) { Project project = projectMapper.queryByCode(projectCode); // check user access for project Map result = @@ -223,7 +224,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ startNodeList, cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority, workerGroup, - environmentCode, startParams, expectedParallelismNumber, dryRun, complementDependentMode); + environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, complementDependentMode); if (create > 0) { processDefinition.setWarningGroupId(warningGroupId); @@ -417,15 +418,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ switch (executeType) { case REPEAT_RUNNING: result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), - processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams); + processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams, processInstance.getTestFlag()); break; case RECOVER_SUSPENDED_PROCESS: result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), - processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams); + processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams, processInstance.getTestFlag()); break; case START_FAILURE_TASK_PROCESS: result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(), - processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams); + processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams, processInstance.getTestFlag()); break; case STOP: if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) { @@ -606,7 +607,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @return insert result code */ private Map insertCommand(User loginUser, Integer instanceId, long processDefinitionCode, - int processVersion, CommandType commandType, String startParams) { + int processVersion, CommandType commandType, String startParams, int testFlag) { Map result = new HashMap<>(); // To add startParams only when repeat running is needed @@ -623,7 +624,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setExecutorId(loginUser.getId()); command.setProcessDefinitionVersion(processVersion); command.setProcessInstanceId(instanceId); - + command.setTestFlag(testFlag); if (!processService.verifyIsNeedCreateCommand(command)) { logger.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.", processDefinitionCode, processVersion, instanceId); @@ -702,6 +703,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ * @param runMode runMode * @param processInstancePriority processInstancePriority * @param workerGroup workerGroup + * @param testFlag testFlag * @param environmentCode environmentCode * @return command id */ @@ -710,7 +712,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ WarningType warningType, int executorId, int warningGroupId, RunMode runMode, Priority processInstancePriority, String workerGroup, Long environmentCode, Map startParams, Integer expectedParallelismNumber, int dryRun, - ComplementDependentMode complementDependentMode) { + int testFlag, ComplementDependentMode complementDependentMode) { /** * instantiate command schedule instance @@ -747,6 +749,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ command.setWorkerGroup(workerGroup); command.setEnvironmentCode(environmentCode); command.setDryRun(dryRun); + command.setTestFlag(testFlag); ProcessDefinition processDefinition = processService.findProcessDefinitionByCode(processDefineCode); if (processDefinition != null) { command.setProcessDefinitionVersion(processDefinition.getVersion()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 7dcaee9365..c344badd29 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -361,7 +361,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processId); return result; } - List taskInstanceList = processService.findValidTaskListByProcessId(processId); + List taskInstanceList = processService.findValidTaskListByProcessId(processId, processInstance.getTestFlag()); addDependResultForTaskList(taskInstanceList); Map resultMap = new HashMap<>(); resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString()); @@ -801,7 +801,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce Map timeParams) { Map> localUserDefParams = new HashMap<>(); List taskInstanceList = - taskInstanceMapper.findValidTaskListByProcessId(processInstance.getId(), Flag.YES); + taskInstanceMapper.findValidTaskListByProcessId(processInstance.getId(), Flag.YES, processInstance.getTestFlag()); for (TaskInstance taskInstance : taskInstanceList) { TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion( taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties b/dolphinscheduler-api/src/main/resources/i18n/messages.properties index 5069bb7d33..53a0ca0fda 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties @@ -301,4 +301,4 @@ TASK_DEFINITION_TAG=task definition related operation PROCESS_TASK_RELATION_TAG=process task relation related operation ENVIRONMENT_TAG=environment related operation GET_PROCESS_LIST_BY_PROCESS_CODE_NOTES=query process definition list by project code -GET_TASK_LIST_BY_PROCESS_CODE_NOTES=query task definition list by process definition code \ No newline at end of file +GET_TASK_LIST_BY_PROCESS_CODE_NOTES=query task definition list by process definition code diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties index 17c34d052f..3905de7e04 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -360,4 +360,4 @@ TASK_DEFINITION_TAG=task definition related operation PROCESS_TASK_RELATION_TAG=process task relation related operation ENVIRONMENT_TAG=environment related operation GET_PROCESS_LIST_BY_PROCESS_CODE_NOTES=query process definition list by project code -GET_TASK_LIST_BY_PROCESS_CODE_NOTES=query task definition list by process definition code \ No newline at end of file +GET_TASK_LIST_BY_PROCESS_CODE_NOTES=query task definition list by process definition code diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties index 223a853307..64cec045cb 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -357,4 +357,4 @@ TASK_DEFINITION_TAG=任务定义相关操作 PROCESS_TASK_RELATION_TAG=工作流关系相关操作 ENVIRONMENT_TAG=环境相关操作 GET_PROCESS_LIST_BY_PROCESS_CODE_NOTES=通过项目代码查询工作流定义 -GET_TASK_LIST_BY_PROCESS_CODE_NOTES=通过工作流定义代码查询任务定义 \ No newline at end of file +GET_TASK_LIST_BY_PROCESS_CODE_NOTES=通过工作流定义代码查询任务定义 diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java index 7faed0bde0..f55aa9f8be 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java @@ -66,6 +66,7 @@ public class ExecutorControllerTest extends AbstractControllerTest { final ImmutableMap startParams = ImmutableMap.of("start", "params"); final Integer expectedParallelismNumber = 6; final int dryRun = 7; + final int testFlag = 0; final ComplementDependentMode complementDependentMode = ComplementDependentMode.OFF_MODE; final JsonObject expectResponseContent = gson @@ -98,12 +99,12 @@ public class ExecutorControllerTest extends AbstractControllerTest { paramsMap.add("startParams", gson.toJson(startParams)); paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber)); paramsMap.add("dryRun", String.valueOf(dryRun)); - + paramsMap.add("testFlag", String.valueOf(testFlag)); when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType), eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode), - eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(complementDependentMode))) + eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag), eq(complementDependentMode))) .thenReturn(executeServiceResult); //When @@ -138,11 +139,12 @@ public class ExecutorControllerTest extends AbstractControllerTest { paramsMap.add("startParams", gson.toJson(startParams)); paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber)); paramsMap.add("dryRun", String.valueOf(dryRun)); + paramsMap.add("testFlag", String.valueOf(testFlag)); when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType), eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode), - eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), + eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag), eq(complementDependentMode))).thenReturn(executeServiceResult); //When @@ -176,11 +178,12 @@ public class ExecutorControllerTest extends AbstractControllerTest { paramsMap.add("timeout", String.valueOf(timeout)); paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber)); paramsMap.add("dryRun", String.valueOf(dryRun)); + paramsMap.add("testFlag", String.valueOf(testFlag)); when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType), eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode), - eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun), + eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag), eq(complementDependentMode))).thenReturn(executeServiceResult); //When @@ -207,7 +210,7 @@ public class ExecutorControllerTest extends AbstractControllerTest { when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode), eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType), eq(0), eq(null), eq(null), eq("default"), eq(-1L), - eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), + eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0), eq(complementDependentMode))).thenReturn(executeServiceResult); //When diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java index cd7924204c..d5e6ac554e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java @@ -315,7 +315,7 @@ public class DataSourceServiceTest { DataSource dataSource = new DataSource(); dataSource.setType(DbType.MYSQL); Mockito.when(dataSourceMapper.selectBatchIds(dataSourceIds)).thenReturn(Collections.singletonList(dataSource)); - Map map = dataSourceService.queryDataSourceList(loginUser, DbType.MYSQL.ordinal()); + Map map = dataSourceService.queryDataSourceList(loginUser, DbType.MYSQL.ordinal(),Constants.TEST_FLAG_NO); Assert.assertEquals(Status.SUCCESS, map.get(Constants.STATUS)); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index 8cd5d681cc..e50fb31ce9 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -222,7 +222,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -244,7 +244,7 @@ public class ExecutorServiceTest { null, "n1,n2", null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -310,7 +310,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS)); verify(processService, times(0)).createCommand(any(Command.class)); @@ -331,7 +331,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_SERIAL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(1)).createCommand(any(Command.class)); @@ -352,7 +352,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(31)).createCommand(any(Command.class)); @@ -374,7 +374,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); verify(processService, times(15)).createCommand(any(Command.class)); @@ -392,7 +392,7 @@ public class ExecutorServiceTest { null, null, null, null, 0, RunMode.RUN_MODE_PARALLEL, - Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO, ComplementDependentMode.OFF_MODE); Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS); @@ -408,6 +408,25 @@ public class ExecutorServiceTest { Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); } + @Test + public void testOfTestRun(){ + Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true); + Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN)) + .thenReturn(checkProjectAndAuth()); + Map result = executorService.execProcessInstance(loginUser, projectCode, + processDefinitionCode, + "{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}", + CommandType.COMPLEMENT_DATA, + null, null, + null, null, 0, + RunMode.RUN_MODE_PARALLEL, + Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_YES, + ComplementDependentMode.OFF_MODE); + Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS)); + } + + + @Test public void testStartCheckByProcessDefinedCode() { List ids = new ArrayList<>(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index 48dd44e468..a3b604066e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -356,9 +356,8 @@ public class ProcessInstanceServiceTest { res.setData("xxx"); when(projectMapper.queryByCode(projectCode)).thenReturn(project); when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result); - when(processService.findProcessInstanceDetailById(processInstance.getId())) - .thenReturn(Optional.of(processInstance)); - when(processService.findValidTaskListByProcessId(processInstance.getId())).thenReturn(taskInstanceList); + when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(Optional.of(processInstance)); + when(processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())).thenReturn(taskInstanceList); when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res); Map successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1); Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 3d7196bae0..9f919204ac 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -851,6 +851,13 @@ public final class Constants { public static final String SECURITY_CONFIG_TYPE_PASSWORD = "PASSWORD"; public static final String SECURITY_CONFIG_TYPE_LDAP = "LDAP"; + + /** + * test flag + */ + public static final int TEST_FLAG_NO = 0; + public static final int TEST_FLAG_YES = 1; + /** * Task Types */ @@ -861,4 +868,6 @@ public final class Constants { public static final String TYPE_DATA_QUALITY = "DataQuality"; public static final String TYPE_OTHER = "Other"; public static final String TYPE_MACHINE_LEARNING = "MachineLearning"; + + public static final String DATASOUCE = "datasource"; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java index ec9336d501..290be4377c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java @@ -93,6 +93,12 @@ public class Command { @TableField("process_definition_version") private int processDefinitionVersion; + /** + * test flag + */ + @TableField("test_flag") + private int testFlag; + public Command() { this.taskDependType = TaskDependType.TASK_POST; this.failureStrategy = FailureStrategy.CONTINUE; @@ -115,7 +121,8 @@ public class Command { Priority processInstancePriority, int dryRun, int processInstanceId, - int processDefinitionVersion) { + int processDefinitionVersion, + int testFlag) { this.commandType = commandType; this.executorId = executorId; this.processDefinitionCode = processDefinitionCode; @@ -133,6 +140,6 @@ public class Command { this.dryRun = dryRun; this.processInstanceId = processInstanceId; this.processDefinitionVersion = processDefinitionVersion; + this.testFlag = testFlag; } - } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DataSource.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DataSource.java index 8a86ff587c..a80a7e06d3 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DataSource.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DataSource.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.entity; +import com.baomidou.mybatisplus.annotation.*; import org.apache.dolphinscheduler.spi.enums.DbType; import java.util.Date; @@ -79,6 +80,17 @@ public class DataSource { */ private Date updateTime; + /** + * test flag + */ + protected int testFlag; + + /** + * bind test data source id + */ + @TableField(fill = FieldFill.INSERT_UPDATE) + protected Integer bindTestId; + @Override public boolean equals(Object o) { if (this == o) { diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java index 0d2eedecd2..d83a3dc624 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.dao.entity; +import com.baomidou.mybatisplus.annotation.TableField; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.FailureStrategy; import org.apache.dolphinscheduler.common.enums.Priority; @@ -121,9 +122,14 @@ public class ErrorCommand { */ private int dryRun; + /** + * test flag + */ + @TableField("test_flag") + private int testFlag; + public ErrorCommand() { } - public ErrorCommand(Command command, String message) { this.id = command.getId(); this.commandType = command.getCommandType(); @@ -141,5 +147,6 @@ public class ErrorCommand { this.processInstancePriority = command.getProcessInstancePriority(); this.message = message; this.dryRun = command.getDryRun(); + this.testFlag = command.getTestFlag(); } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java index db08b2bd26..6b83ef928b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java @@ -276,6 +276,11 @@ public class ProcessInstance { @TableField(exist = false) private boolean isBlocked; + /** + * test flag + */ + private int testFlag; + /** * set the process name with process define version and timestamp * diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 08ab6d11ac..3236b3d158 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -300,6 +300,11 @@ public class TaskInstance implements Serializable { */ private TaskExecuteType taskExecuteType; + /** + * test flag + */ + private int testFlag; + public void init(String host, Date startTime, String executePath) { this.host = host; this.startTime = startTime; diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java index fb6a514d42..b70c5f801c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java @@ -34,12 +34,12 @@ import com.baomidou.mybatisplus.core.metadata.IPage; public interface DataSourceMapper extends BaseMapper { /** - * query datasource by type + * query online/testDatasource by type * @param userId userId * @param type type * @return datasource list */ - List queryDataSourceByType(@Param("userId") int userId, @Param("type") Integer type); + List queryDataSourceByType(@Param("userId") int userId, @Param("type") Integer type, @Param("testFlag") int testFlag); /** * datasource page @@ -90,7 +90,7 @@ public interface DataSourceMapper extends BaseMapper { * @param T * @return UDF function list */ - List listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds); + List listAuthorizedDataSource(@Param("userId") int userId, @Param("dataSourceIds") T[] dataSourceIds); /** * query datasource by name and user id @@ -108,5 +108,19 @@ public interface DataSourceMapper extends BaseMapper { * @param searchVal * @return */ - IPage selectPagingByIds(Page dataSourcePage, @Param("dataSourceIds")List dataSourceIds, @Param("name")String name); + IPage selectPagingByIds(Page dataSourcePage, @Param("dataSourceIds") List dataSourceIds, @Param("name") String name); + + /** + * clearBindTestId + * @param bindTestId + * @return + */ + void clearBindTestId(@Param("bindTestId") Integer bindTestId); + + /** + * queryTestDataSourceId + * @param onlineDataSourceId + * @return Integer + */ + Integer queryTestDataSourceId(@Param("dataSourceId") Integer onlineDataSourceId); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index a667f0fc4c..eb1758c949 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -187,11 +187,13 @@ public interface ProcessInstanceMapper extends BaseMapper { * @param definitionCode definitionCode * @param startTime startTime * @param endTime endTime + * @param testFlag testFlag * @return process instance */ ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long definitionCode, @Param("startTime") Date startTime, - @Param("endTime") Date endTime); + @Param("endTime") Date endTime, + @Param("testFlag") int testFlag); /** * query last running process instance @@ -199,12 +201,14 @@ public interface ProcessInstanceMapper extends BaseMapper { * @param definitionCode definitionCode * @param startTime startTime * @param endTime endTime + * @param testFlag testFlag * @param stateArray stateArray * @return process instance */ ProcessInstance queryLastRunningProcess(@Param("processDefinitionCode") Long definitionCode, @Param("startTime") Date startTime, @Param("endTime") Date endTime, + @Param("testFlag") int testFlag, @Param("states") int[] stateArray); /** @@ -213,11 +217,13 @@ public interface ProcessInstanceMapper extends BaseMapper { * @param definitionCode definitionCode * @param startTime startTime * @param endTime endTime + * @param testFlag testFlag * @return process instance */ ProcessInstance queryLastManualProcess(@Param("processDefinitionCode") Long definitionCode, @Param("startTime") Date startTime, - @Param("endTime") Date endTime); + @Param("endTime") Date endTime, + @Param("testFlag") int testFlag); /** * query top n process instance order by running duration diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java index 2ab78f5151..cdc26ee4e6 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java @@ -38,7 +38,8 @@ public interface TaskInstanceMapper extends BaseMapper { @Param("state") Integer state); List findValidTaskListByProcessId(@Param("processInstanceId") Integer processInstanceId, - @Param("flag") Flag flag); + @Param("flag") Flag flag, + @Param("testFlag") int testFlag); List queryByHostAndStatus(@Param("host") String host, @Param("states") int[] stateArray); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java index 498abcb3a4..1f6ffdfd09 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java @@ -75,6 +75,7 @@ public class TaskInstanceUtils { target.setCpuQuota(source.getCpuQuota()); target.setMemoryMax(source.getMemoryMax()); target.setTaskExecuteType(source.getTaskExecuteType()); + target.setTestFlag(source.getTestFlag()); } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml index f5ce9bd2d6..219a22a53a 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml @@ -26,6 +26,7 @@ from t_ds_datasource where type=#{type} + and test_flag=#{testFlag} and id in (select datasource_id @@ -40,7 +41,7 @@ @@ -115,7 +116,7 @@ - + + + update t_ds_datasource d set bind_test_id=null where bind_test_id=#{bindTestId} + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 87e71a85f2..d5a9136b40 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -24,7 +24,7 @@ warning_group_id, schedule_time, command_start_time, global_params, flag, update_time, is_sub_process, executor_id, history_cmd, process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool, - dry_run, next_process_instance_id, restart_time, state_history + dry_run, test_flag, next_process_instance_id, restart_time, state_history select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version, instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time, - instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id, + instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run , instance.test_flag ,instance.next_process_instance_id, restart_time, instance.state_history from t_ds_process_instance instance join t_ds_process_definition define ON instance.process_definition_code = define.code @@ -200,7 +200,7 @@ select from t_ds_process_instance - where process_definition_code=#{processDefinitionCode} + where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag} and schedule_time = ]]> #{startTime} and schedule_time #{endTime} @@ -210,7 +210,7 @@ select from t_ds_process_instance - where process_definition_code=#{processDefinitionCode} + where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag} and state in @@ -227,7 +227,7 @@ select from t_ds_process_instance - where process_definition_code=#{processDefinitionCode} + where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag} and schedule_time is null and start_time = ]]> #{startTime} and start_time #{endTime} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index aa13818213..dafcc98953 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -22,13 +22,13 @@ id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time, start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link, flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id, - first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id, cpu_quota, memory_max, task_execute_type + first_submit_time, delay_time, task_params, var_pool, dry_run, test_flag, task_group_id, cpu_quota, memory_max, task_execute_type ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time, ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link, ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id, - ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.task_group_id, ${alias}.task_execute_type + ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type update t_ds_task_instance @@ -54,6 +54,7 @@ from t_ds_task_instance WHERE process_instance_id = #{processInstanceId} and flag = #{flag} + and test_flag=#{testFlag} order by start_time desc