Browse Source

[Feature-11473][Task]Support test task (#11670)

* [improve] support test tasks

* [improve] support test tasks

* [improve] support test tasks

* [improve] support test tasks

* [improve] support test tasks

* Update TaskExecuteThread.java

* try solve e2e q

* try solve e2e q

* try solve e2e q

* try solve e2e q

* try solve e2e q

* try solve e2e q

* try solve e2e q

* try solve e2e q

* try solve e2e q

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* Update DataSource.java

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* Update messages_zh_CN.properties

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* Update messages.properties

* Update messages_en_US.properties

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks

* [Feature] support test tasks
3.2.0-release
insist777 2 years ago committed by GitHub
parent
commit
5b384f3fab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java
  2. 75
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
  3. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
  4. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java
  5. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
  6. 21
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java
  7. 19
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  8. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
  9. 2
      dolphinscheduler-api/src/main/resources/i18n/messages.properties
  10. 2
      dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties
  11. 2
      dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties
  12. 13
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java
  13. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java
  14. 33
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  15. 5
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
  16. 9
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  17. 11
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java
  18. 12
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DataSource.java
  19. 9
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java
  20. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
  21. 5
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  22. 22
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java
  23. 10
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
  24. 3
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java
  25. 1
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
  26. 16
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml
  27. 10
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
  28. 5
      dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml
  29. 6
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  30. 6
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  31. 5
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  32. 122
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql
  33. 16
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_dml.sql
  34. 121
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql
  35. 29
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_dml.sql
  36. 4
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java
  37. 12
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
  38. 8
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java
  39. 20
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java
  40. 1
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
  41. 4
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/ClickhouseDataSourceE2ETest.java
  42. 3
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/HiveDataSourceE2ETest.java
  43. 3
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/MysqlDataSourceE2ETest.java
  44. 3
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/PostgresDataSourceE2ETest.java
  45. 3
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/SqlServerDataSourceE2ETest.java
  46. 16
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java
  47. 1
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  48. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java
  49. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  50. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java
  51. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  52. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java
  53. 10
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java
  54. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java
  55. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  56. 33
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java
  57. 3
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
  58. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java
  59. 25
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java
  60. 84
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
  61. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java
  62. 10
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  63. 52
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java
  64. 35
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java
  65. 5
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
  66. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
  67. 1
      dolphinscheduler-ui/src/common/common.ts
  68. 10
      dolphinscheduler-ui/src/locales/en_US/datasource.ts
  69. 7
      dolphinscheduler-ui/src/locales/en_US/project.ts
  70. 10
      dolphinscheduler-ui/src/locales/zh_CN/datasource.ts
  71. 7
      dolphinscheduler-ui/src/locales/zh_CN/project.ts
  72. 2
      dolphinscheduler-ui/src/service/modules/data-source/types.ts
  73. 1
      dolphinscheduler-ui/src/service/modules/process-instances/types.ts
  74. 45
      dolphinscheduler-ui/src/utils/environmental-distinction.ts
  75. 37
      dolphinscheduler-ui/src/views/datasource/list/detail.tsx
  76. 10
      dolphinscheduler-ui/src/views/datasource/list/use-columns.ts
  77. 60
      dolphinscheduler-ui/src/views/datasource/list/use-form.ts
  78. 6
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts
  79. 5
      dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sqoop-datasource.ts
  80. 1
      dolphinscheduler-ui/src/views/projects/task/instance/types.ts
  81. 24
      dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts
  82. 7
      dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx
  83. 3
      dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts
  84. 16
      dolphinscheduler-ui/src/views/projects/workflow/instance/use-table.ts
  85. 25
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/DataSourceController.java

@ -84,7 +84,7 @@ public class DataSourceController extends BaseController {
*
* @param loginUser login user
* @param jsonStr datasource param
* example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2}
* example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2,"testFlag":0,"bindTestId":1}
* @return create result code
*/
@ApiOperation(value = "createDataSource", notes = "CREATE_DATA_SOURCE_NOTES")
@ -104,7 +104,7 @@ public class DataSourceController extends BaseController {
* @param loginUser login user
* @param id datasource id
* @param jsonStr datasource param
* example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2}
* example: {"type":"MYSQL","name":"txx","note":"","host":"localhost","port":3306,"principal":"","javaSecurityKrb5Conf":"","loginUserKeytabUsername":"","loginUserKeytabPath":"","userName":"root","password":"xxx","database":"ds","connectType":"","other":{"serverTimezone":"GMT-8"},"id":2,"testFlag":0,"bindTestId":1}
* @return update result code
*/
@ApiOperation(value = "updateDataSource", notes = "UPDATE_DATA_SOURCE_NOTES")
@ -148,7 +148,7 @@ public class DataSourceController extends BaseController {
}
/**
* query datasource by type
* query online/testDatasource by type
*
* @param loginUser login user
* @param type data source type
@ -156,15 +156,17 @@ public class DataSourceController extends BaseController {
*/
@ApiOperation(value = "queryDataSourceList", notes = "QUERY_DATA_SOURCE_LIST_BY_TYPE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "type", value = "DB_TYPE", required = true, dataType = "DbType")
@ApiImplicitParam(name = "type", value = "DB_TYPE", required = true, dataType = "DbType"),
@ApiImplicitParam(name = "testFlag", value = "DB_TEST_FLAG", required = true, dataType = "DbTestFlag")
})
@GetMapping(value = "/list")
@ResponseStatus(HttpStatus.OK)
@ApiException(QUERY_DATASOURCE_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result queryDataSourceList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam("type") DbType type) {
Map<String, Object> result = dataSourceService.queryDataSourceList(loginUser, type.ordinal());
@RequestParam("type") DbType type,
@RequestParam("testFlag") int testFlag) {
Map<String, Object> result = dataSourceService.queryDataSourceList(loginUser, type.ordinal(), testFlag);
return returnDataList(result);
}

75
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java

@ -103,26 +103,28 @@ public class ExecutorController extends BaseController {
* @param workerGroup worker group
* @param timeout timeout
* @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
* @param testFlag testFlag
* @return start process result code
*/
@ApiOperation(value = "startProcessInstance", notes = "RUN_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataTypeClass = long.class, example = "100"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataTypeClass = String.class, example = "2022-04-06 00:00:00,2022-04-06 00:00:00"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataTypeClass = FailureStrategy.class),
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataTypeClass = String.class),
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataTypeClass = TaskDependType.class),
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataTypeClass = CommandType.class),
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataTypeClass = WarningType.class),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", dataTypeClass = int.class, example = "100"),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataTypeClass = RunMode.class),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataTypeClass = Priority.class),
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataTypeClass = String.class, example = "default"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataTypeClass = long.class, example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataTypeClass = int.class, example = "100"),
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataTypeClass = int.class, example = "8"),
@ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataTypeClass = int.class, example = "0"),
@ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataTypeClass = ComplementDependentMode.class)
@ApiImplicitParam(name = "processDefinitionCode", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "100"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String", example = "2022-04-06 00:00:00,2022-04-06 00:00:00"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"),
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"),
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"),
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int" , example = "8"),
@ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"),
@ApiImplicitParam(name = "testFlag", value = "TEST_FLAG", dataType = "Int", example = "0"),
@ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
})
@PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK)
@ -146,6 +148,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {
if (timeout == null) {
@ -163,8 +166,7 @@ public class ExecutorController extends BaseController {
Map<String, Object> result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode,
scheduleTime, execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
complementDependentMode);
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag, complementDependentMode);
return returnDataList(result);
}
@ -188,26 +190,28 @@ public class ExecutorController extends BaseController {
* @param workerGroup worker group
* @param timeout timeout
* @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
* @param testFlag testFlag
* @return start process result code
*/
@ApiOperation(value = "batchStartProcessInstance", notes = "BATCH_RUN_PROCESS_INSTANCE_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataTypeClass = String.class, example = "1,2,3"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataTypeClass = String.class, example = "2022-04-06 00:00:00,2022-04-06 00:00:00"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataTypeClass = FailureStrategy.class),
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataTypeClass = String.class),
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataTypeClass = TaskDependType.class),
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataTypeClass = CommandType.class),
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataTypeClass = WarningType.class),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataTypeClass = int.class, example = "100"),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataTypeClass = RunMode.class),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataTypeClass = Priority.class),
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataTypeClass = String.class, example = "default"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataTypeClass = long.class, example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataTypeClass = int.class, example = "100"),
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataTypeClass = int.class, example = "8"),
@ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataTypeClass = int.class, example = "0"),
@ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataTypeClass = ComplementDependentMode.class)
@ApiImplicitParam(name = "processDefinitionCodes", value = "PROCESS_DEFINITION_CODES", required = true, dataType = "String", example = "1,2,3"),
@ApiImplicitParam(name = "scheduleTime", value = "SCHEDULE_TIME", required = true, dataType = "String", example = "2022-04-06 00:00:00,2022-04-06 00:00:00"),
@ApiImplicitParam(name = "failureStrategy", value = "FAILURE_STRATEGY", required = true, dataType = "FailureStrategy"),
@ApiImplicitParam(name = "startNodeList", value = "START_NODE_LIST", dataType = "String"),
@ApiImplicitParam(name = "taskDependType", value = "TASK_DEPEND_TYPE", dataType = "TaskDependType"),
@ApiImplicitParam(name = "execType", value = "COMMAND_TYPE", dataType = "CommandType"),
@ApiImplicitParam(name = "warningType", value = "WARNING_TYPE", required = true, dataType = "WarningType"),
@ApiImplicitParam(name = "warningGroupId", value = "WARNING_GROUP_ID", required = true, dataType = "Int", example = "100"),
@ApiImplicitParam(name = "runMode", value = "RUN_MODE", dataType = "RunMode"),
@ApiImplicitParam(name = "processInstancePriority", value = "PROCESS_INSTANCE_PRIORITY", required = true, dataType = "Priority"),
@ApiImplicitParam(name = "workerGroup", value = "WORKER_GROUP", dataType = "String", example = "default"),
@ApiImplicitParam(name = "environmentCode", value = "ENVIRONMENT_CODE", dataType = "Long", example = "-1"),
@ApiImplicitParam(name = "timeout", value = "TIMEOUT", dataType = "Int", example = "100"),
@ApiImplicitParam(name = "expectedParallelismNumber", value = "EXPECTED_PARALLELISM_NUMBER", dataType = "Int", example = "8"),
@ApiImplicitParam(name = "dryRun", value = "DRY_RUN", dataType = "Int", example = "0"),
@ApiImplicitParam(name = "testFlag", value = "TEST_FLAG", dataType = "Int", example = "0"),
@ApiImplicitParam(name = "complementDependentMode", value = "COMPLEMENT_DEPENDENT_MODE", dataType = "complementDependentMode")
})
@PostMapping(value = "batch-start-process-instance")
@ResponseStatus(HttpStatus.OK)
@ -231,6 +235,7 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "startParams", required = false) String startParams,
@RequestParam(value = "expectedParallelismNumber", required = false) Integer expectedParallelismNumber,
@RequestParam(value = "dryRun", defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag", defaultValue = "0") int testFlag,
@RequestParam(value = "complementDependentMode", required = false) ComplementDependentMode complementDependentMode) {
if (timeout == null) {
@ -259,7 +264,7 @@ public class ExecutorController extends BaseController {
result = execService.execProcessInstance(loginUser, projectCode, processDefinitionCode, scheduleTime,
execType, failureStrategy,
startNodeList, taskDependType, warningType, warningGroupId, runMode, processInstancePriority,
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun,
workerGroup, environmentCode, timeout, startParamMap, expectedParallelismNumber, dryRun, testFlag,
complementDependentMode);
if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java

@ -89,6 +89,7 @@ public class PythonGateway {
private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE = TaskDependType.TASK_POST;
private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
private static final int DEFAULT_DRY_RUN = 0;
private static final int DEFAULT_TEST_FLAG = 0;
private static final ComplementDependentMode COMPLEMENT_DEPENDENT_MODE = ComplementDependentMode.OFF_MODE;
// We use admin user's user_id to skip some permission issue from python gateway service
private static final int ADMIN_USER_ID = 1;
@ -255,7 +256,7 @@ public class PythonGateway {
processDefinitionCode = processDefinition.getCode();
}
// Fresh process definition schedule
// Fresh process definition schedule
if (schedule != null) {
createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId);
}
@ -357,6 +358,7 @@ public class PythonGateway {
null,
null,
DEFAULT_DRY_RUN,
DEFAULT_TEST_FLAG,
COMPLEMENT_DEPENDENT_MODE
);
}

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/DataSourceService.java

@ -69,13 +69,13 @@ public interface DataSourceService {
Result queryDataSourceListPaging(User loginUser, String searchVal, Integer pageNo, Integer pageSize);
/**
* query data resource list
* query online/test data resource list
*
* @param loginUser login user
* @param type data source type
* @return data source list page
*/
Map<String, Object> queryDataSourceList(User loginUser, Integer type);
Map<String, Object> queryDataSourceList(User loginUser, Integer type, int testFlag);
/**
* verify datasource exists

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java

@ -65,7 +65,7 @@ public interface ExecutorService {
RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun,
int dryRun, int testFlag,
ComplementDependentMode complementDependentMode);
/**

21
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/DataSourceServiceImpl.java

@ -141,6 +141,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
dataSource.setConnectionParams(JSONUtils.toJsonString(connectionParam));
dataSource.setCreateTime(now);
dataSource.setUpdateTime(now);
dataSource.setTestFlag(datasourceParam.getTestFlag());
dataSource.setBindTestId(datasourceParam.getBindTestId());
try {
dataSourceMapper.insert(dataSource);
putMsg(result, Status.SUCCESS);
@ -211,6 +213,11 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
dataSource.setType(dataSource.getType());
dataSource.setConnectionParams(JSONUtils.toJsonString(connectionParam));
dataSource.setUpdateTime(now);
if(dataSource.getTestFlag() == 1 && dataSourceParam.getTestFlag() == 0){
clearBindTestId(id);
}
dataSource.setTestFlag(dataSourceParam.getTestFlag());
dataSource.setBindTestId(dataSourceParam.getBindTestId());
try {
dataSourceMapper.updateById(dataSource);
logger.info("Update datasource complete, datasourceId:{}, datasourceName:{}.", dataSource.getId(), dataSource.getName());
@ -249,7 +256,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
baseDataSourceParamDTO.setId(dataSource.getId());
baseDataSourceParamDTO.setName(dataSource.getName());
baseDataSourceParamDTO.setNote(dataSource.getNote());
baseDataSourceParamDTO.setTestFlag(dataSource.getTestFlag());
baseDataSourceParamDTO.setBindTestId(dataSource.getBindTestId());
result.put(Constants.DATA_LIST, baseDataSourceParamDTO);
putMsg(result, Status.SUCCESS);
return result;
@ -320,12 +328,12 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
* @return data source list page
*/
@Override
public Map<String, Object> queryDataSourceList(User loginUser, Integer type) {
public Map<String, Object> queryDataSourceList(User loginUser, Integer type, int testFlag) {
Map<String, Object> result = new HashMap<>();
List<DataSource> datasourceList = null;
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
datasourceList = dataSourceMapper.queryDataSourceByType(0, type);
datasourceList = dataSourceMapper.queryDataSourceByType(0, type, testFlag);
} else {
Set<Integer> ids = resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.DATASOURCE, loginUser.getId(), logger);
if (ids.isEmpty()) {
@ -333,7 +341,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
putMsg(result, Status.SUCCESS);
return result;
}
datasourceList = dataSourceMapper.selectBatchIds(ids).stream().filter(dataSource -> dataSource.getType().getCode() == type).collect(Collectors.toList());
datasourceList = dataSourceMapper.selectBatchIds(ids).stream().filter(dataSource -> dataSource.getType().getCode() == type).filter(dataSource -> dataSource.getTestFlag() == testFlag).collect(Collectors.toList());
}
result.put(Constants.DATA_LIST, datasourceList);
putMsg(result, Status.SUCCESS);
@ -432,6 +441,7 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
}
dataSourceMapper.deleteById(datasourceId);
datasourceUserMapper.deleteByDatasourceId(datasourceId);
clearBindTestId(datasourceId);
logger.info("Delete datasource complete, datasourceId:{}.", datasourceId);
putMsg(result, Status.SUCCESS);
} catch (Exception e) {
@ -679,5 +689,8 @@ public class DataSourceServiceImpl extends BaseServiceImpl implements DataSource
}
}
}
private void clearBindTestId(Integer bindTestId){
dataSourceMapper.clearBindTestId(bindTestId);
}
}

19
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -165,6 +165,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param timeout timeout
* @param startParams the global param values which pass to new process instance
* @param expectedParallelismNumber the expected parallelism number when execute complement in parallel mode
* @param testFlag testFlag
* @return execute process instance code
*/
@Override
@ -176,7 +177,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
Priority processInstancePriority, String workerGroup,
Long environmentCode, Integer timeout,
Map<String, String> startParams, Integer expectedParallelismNumber,
int dryRun, ComplementDependentMode complementDependentMode) {
int dryRun, int testFlag, ComplementDependentMode complementDependentMode) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
@ -223,7 +224,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
startNodeList,
cronTime, warningType, loginUser.getId(), warningGroupId, runMode, processInstancePriority,
workerGroup,
environmentCode, startParams, expectedParallelismNumber, dryRun, complementDependentMode);
environmentCode, startParams, expectedParallelismNumber, dryRun, testFlag, complementDependentMode);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
@ -417,15 +418,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
switch (executeType) {
case REPEAT_RUNNING:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams);
processDefinition.getVersion(), CommandType.REPEAT_RUNNING, startParams, processInstance.getTestFlag());
break;
case RECOVER_SUSPENDED_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams);
processDefinition.getVersion(), CommandType.RECOVER_SUSPENDED_PROCESS, startParams, processInstance.getTestFlag());
break;
case START_FAILURE_TASK_PROCESS:
result = insertCommand(loginUser, processInstanceId, processDefinition.getCode(),
processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams);
processDefinition.getVersion(), CommandType.START_FAILURE_TASK_PROCESS, startParams, processInstance.getTestFlag());
break;
case STOP:
if (processInstance.getState() == WorkflowExecutionStatus.READY_STOP) {
@ -606,7 +607,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @return insert result code
*/
private Map<String, Object> insertCommand(User loginUser, Integer instanceId, long processDefinitionCode,
int processVersion, CommandType commandType, String startParams) {
int processVersion, CommandType commandType, String startParams, int testFlag) {
Map<String, Object> result = new HashMap<>();
// To add startParams only when repeat running is needed
@ -623,7 +624,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setExecutorId(loginUser.getId());
command.setProcessDefinitionVersion(processVersion);
command.setProcessInstanceId(instanceId);
command.setTestFlag(testFlag);
if (!processService.verifyIsNeedCreateCommand(command)) {
logger.warn("Process instance is executing the command, processDefinitionCode:{}, processDefinitionVersion:{}, processInstanceId:{}.",
processDefinitionCode, processVersion, instanceId);
@ -702,6 +703,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
* @param runMode runMode
* @param processInstancePriority processInstancePriority
* @param workerGroup workerGroup
* @param testFlag testFlag
* @param environmentCode environmentCode
* @return command id
*/
@ -710,7 +712,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
WarningType warningType, int executorId, int warningGroupId, RunMode runMode,
Priority processInstancePriority, String workerGroup, Long environmentCode,
Map<String, String> startParams, Integer expectedParallelismNumber, int dryRun,
ComplementDependentMode complementDependentMode) {
int testFlag, ComplementDependentMode complementDependentMode) {
/**
* instantiate command schedule instance
@ -747,6 +749,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
command.setWorkerGroup(workerGroup);
command.setEnvironmentCode(environmentCode);
command.setDryRun(dryRun);
command.setTestFlag(testFlag);
ProcessDefinition processDefinition = processService.findProcessDefinitionByCode(processDefineCode);
if (processDefinition != null) {
command.setProcessDefinitionVersion(processDefinition.getVersion());

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java

@ -361,7 +361,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
putMsg(result, PROCESS_INSTANCE_NOT_EXIST, processId);
return result;
}
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId);
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processId, processInstance.getTestFlag());
addDependResultForTaskList(taskInstanceList);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(PROCESS_INSTANCE_STATE, processInstance.getState().toString());
@ -801,7 +801,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
Map<String, String> timeParams) {
Map<String, Map<String, Object>> localUserDefParams = new HashMap<>();
List<TaskInstance> taskInstanceList =
taskInstanceMapper.findValidTaskListByProcessId(processInstance.getId(), Flag.YES);
taskInstanceMapper.findValidTaskListByProcessId(processInstance.getId(), Flag.YES, processInstance.getTestFlag());
for (TaskInstance taskInstance : taskInstanceList) {
TaskDefinitionLog taskDefinitionLog = taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());

2
dolphinscheduler-api/src/main/resources/i18n/messages.properties

@ -301,4 +301,4 @@ TASK_DEFINITION_TAG=task definition related operation
PROCESS_TASK_RELATION_TAG=process task relation related operation
ENVIRONMENT_TAG=environment related operation
GET_PROCESS_LIST_BY_PROCESS_CODE_NOTES=query process definition list by project code
GET_TASK_LIST_BY_PROCESS_CODE_NOTES=query task definition list by process definition code
GET_TASK_LIST_BY_PROCESS_CODE_NOTES=query task definition list by process definition code

2
dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties

@ -360,4 +360,4 @@ TASK_DEFINITION_TAG=task definition related operation
PROCESS_TASK_RELATION_TAG=process task relation related operation
ENVIRONMENT_TAG=environment related operation
GET_PROCESS_LIST_BY_PROCESS_CODE_NOTES=query process definition list by project code
GET_TASK_LIST_BY_PROCESS_CODE_NOTES=query task definition list by process definition code
GET_TASK_LIST_BY_PROCESS_CODE_NOTES=query task definition list by process definition code

2
dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties

@ -357,4 +357,4 @@ TASK_DEFINITION_TAG=任务定义相关操作
PROCESS_TASK_RELATION_TAG=工作流关系相关操作
ENVIRONMENT_TAG=环境相关操作
GET_PROCESS_LIST_BY_PROCESS_CODE_NOTES=通过项目代码查询工作流定义
GET_TASK_LIST_BY_PROCESS_CODE_NOTES=通过工作流定义代码查询任务定义
GET_TASK_LIST_BY_PROCESS_CODE_NOTES=通过工作流定义代码查询任务定义

13
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecutorControllerTest.java

@ -66,6 +66,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
final ImmutableMap<String, String> startParams = ImmutableMap.of("start", "params");
final Integer expectedParallelismNumber = 6;
final int dryRun = 7;
final int testFlag = 0;
final ComplementDependentMode complementDependentMode = ComplementDependentMode.OFF_MODE;
final JsonObject expectResponseContent = gson
@ -98,12 +99,12 @@ public class ExecutorControllerTest extends AbstractControllerTest {
paramsMap.add("startParams", gson.toJson(startParams));
paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber));
paramsMap.add("dryRun", String.valueOf(dryRun));
paramsMap.add("testFlag", String.valueOf(testFlag));
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType),
eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode),
eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(complementDependentMode)))
eq(timeout), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag), eq(complementDependentMode)))
.thenReturn(executeServiceResult);
//When
@ -138,11 +139,12 @@ public class ExecutorControllerTest extends AbstractControllerTest {
paramsMap.add("startParams", gson.toJson(startParams));
paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber));
paramsMap.add("dryRun", String.valueOf(dryRun));
paramsMap.add("testFlag", String.valueOf(testFlag));
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType),
eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode),
eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun),
eq(Constants.MAX_TASK_TIMEOUT), eq(startParams), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag),
eq(complementDependentMode))).thenReturn(executeServiceResult);
//When
@ -176,11 +178,12 @@ public class ExecutorControllerTest extends AbstractControllerTest {
paramsMap.add("timeout", String.valueOf(timeout));
paramsMap.add("expectedParallelismNumber", String.valueOf(expectedParallelismNumber));
paramsMap.add("dryRun", String.valueOf(dryRun));
paramsMap.add("testFlag", String.valueOf(testFlag));
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy), eq(startNodeList), eq(taskDependType), eq(warningType),
eq(warningGroupId), eq(runMode), eq(processInstancePriority), eq(workerGroup), eq(environmentCode),
eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun),
eq(timeout), eq(null), eq(expectedParallelismNumber), eq(dryRun), eq(testFlag),
eq(complementDependentMode))).thenReturn(executeServiceResult);
//When
@ -207,7 +210,7 @@ public class ExecutorControllerTest extends AbstractControllerTest {
when(executorService.execProcessInstance(any(User.class), eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(null), eq(failureStrategy), eq(null), eq(null), eq(warningType),
eq(0), eq(null), eq(null), eq("default"), eq(-1L),
eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0),
eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0), eq(0),
eq(complementDependentMode))).thenReturn(executeServiceResult);
//When

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/DataSourceServiceTest.java

@ -315,7 +315,7 @@ public class DataSourceServiceTest {
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
Mockito.when(dataSourceMapper.selectBatchIds(dataSourceIds)).thenReturn(Collections.singletonList(dataSource));
Map<String, Object> map = dataSourceService.queryDataSourceList(loginUser, DbType.MYSQL.ordinal());
Map<String, Object> map = dataSourceService.queryDataSourceList(loginUser, DbType.MYSQL.ordinal(),Constants.TEST_FLAG_NO);
Assert.assertEquals(Status.SUCCESS, map.get(Constants.STATUS));
}

33
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -222,7 +222,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
@ -244,7 +244,7 @@ public class ExecutorServiceTest {
null, "n1,n2",
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
@ -310,7 +310,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.START_PROCESS_INSTANCE_ERROR, result.get(Constants.STATUS));
verify(processService, times(0)).createCommand(any(Command.class));
@ -331,7 +331,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_SERIAL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(1)).createCommand(any(Command.class));
@ -352,7 +352,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(31)).createCommand(any(Command.class));
@ -374,7 +374,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(processService, times(15)).createCommand(any(Command.class));
@ -392,7 +392,7 @@ public class ExecutorServiceTest {
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(result.get(Constants.STATUS), Status.MASTER_NOT_EXISTS);
@ -408,6 +408,25 @@ public class ExecutorServiceTest {
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
public void testOfTestRun(){
Mockito.when(processService.verifyIsNeedCreateCommand(any(Command.class))).thenReturn(true);
Mockito.when(projectService.checkProjectAndAuth(loginUser, project, projectCode, RERUN))
.thenReturn(checkProjectAndAuth());
Map<String, Object> result = executorService.execProcessInstance(loginUser, projectCode,
processDefinitionCode,
"{\"complementStartDate\":\"2020-01-01 00:00:00\",\"complementEndDate\":\"2020-01-31 23:00:00\"}",
CommandType.COMPLEMENT_DATA,
null, null,
null, null, 0,
RunMode.RUN_MODE_PARALLEL,
Priority.LOW, Constants.DEFAULT_WORKER_GROUP, 100L, 110, null, 15, Constants.DRY_RUN_FLAG_NO, Constants.TEST_FLAG_YES,
ComplementDependentMode.OFF_MODE);
Assert.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
@Test
public void testStartCheckByProcessDefinedCode() {
List<Long> ids = new ArrayList<>();

5
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java

@ -356,9 +356,8 @@ public class ProcessInstanceServiceTest {
res.setData("xxx");
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project, projectCode, WORKFLOW_INSTANCE)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId()))
.thenReturn(Optional.of(processInstance));
when(processService.findValidTaskListByProcessId(processInstance.getId())).thenReturn(taskInstanceList);
when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(Optional.of(processInstance));
when(processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag())).thenReturn(taskInstanceList);
when(loggerService.queryLog(taskInstance.getId(), 0, 4098)).thenReturn(res);
Map<String, Object> successRes = processInstanceService.queryTaskListByProcessId(loginUser, projectCode, 1);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));

9
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -851,6 +851,13 @@ public final class Constants {
public static final String SECURITY_CONFIG_TYPE_PASSWORD = "PASSWORD";
public static final String SECURITY_CONFIG_TYPE_LDAP = "LDAP";
/**
* test flag
*/
public static final int TEST_FLAG_NO = 0;
public static final int TEST_FLAG_YES = 1;
/**
* Task Types
*/
@ -861,4 +868,6 @@ public final class Constants {
public static final String TYPE_DATA_QUALITY = "DataQuality";
public static final String TYPE_OTHER = "Other";
public static final String TYPE_MACHINE_LEARNING = "MachineLearning";
public static final String DATASOUCE = "datasource";
}

11
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/Command.java

@ -93,6 +93,12 @@ public class Command {
@TableField("process_definition_version")
private int processDefinitionVersion;
/**
* test flag
*/
@TableField("test_flag")
private int testFlag;
public Command() {
this.taskDependType = TaskDependType.TASK_POST;
this.failureStrategy = FailureStrategy.CONTINUE;
@ -115,7 +121,8 @@ public class Command {
Priority processInstancePriority,
int dryRun,
int processInstanceId,
int processDefinitionVersion) {
int processDefinitionVersion,
int testFlag) {
this.commandType = commandType;
this.executorId = executorId;
this.processDefinitionCode = processDefinitionCode;
@ -133,6 +140,6 @@ public class Command {
this.dryRun = dryRun;
this.processInstanceId = processInstanceId;
this.processDefinitionVersion = processDefinitionVersion;
this.testFlag = testFlag;
}
}

12
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/DataSource.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.entity;
import com.baomidou.mybatisplus.annotation.*;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.util.Date;
@ -79,6 +80,17 @@ public class DataSource {
*/
private Date updateTime;
/**
* test flag
*/
protected int testFlag;
/**
* bind test data source id
*/
@TableField(fill = FieldFill.INSERT_UPDATE)
protected Integer bindTestId;
@Override
public boolean equals(Object o) {
if (this == o) {

9
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ErrorCommand.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.dao.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
@ -121,9 +122,14 @@ public class ErrorCommand {
*/
private int dryRun;
/**
* test flag
*/
@TableField("test_flag")
private int testFlag;
public ErrorCommand() {
}
public ErrorCommand(Command command, String message) {
this.id = command.getId();
this.commandType = command.getCommandType();
@ -141,5 +147,6 @@ public class ErrorCommand {
this.processInstancePriority = command.getProcessInstancePriority();
this.message = message;
this.dryRun = command.getDryRun();
this.testFlag = command.getTestFlag();
}
}

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java

@ -276,6 +276,11 @@ public class ProcessInstance {
@TableField(exist = false)
private boolean isBlocked;
/**
* test flag
*/
private int testFlag;
/**
* set the process name with process define version and timestamp
*

5
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -300,6 +300,11 @@ public class TaskInstance implements Serializable {
*/
private TaskExecuteType taskExecuteType;
/**
* test flag
*/
private int testFlag;
public void init(String host, Date startTime, String executePath) {
this.host = host;
this.startTime = startTime;

22
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.java

@ -34,12 +34,12 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
public interface DataSourceMapper extends BaseMapper<DataSource> {
/**
* query datasource by type
* query online/testDatasource by type
* @param userId userId
* @param type type
* @return datasource list
*/
List<DataSource> queryDataSourceByType(@Param("userId") int userId, @Param("type") Integer type);
List<DataSource> queryDataSourceByType(@Param("userId") int userId, @Param("type") Integer type, @Param("testFlag") int testFlag);
/**
* datasource page
@ -90,7 +90,7 @@ public interface DataSourceMapper extends BaseMapper<DataSource> {
* @param <T> T
* @return UDF function list
*/
<T> List<DataSource> listAuthorizedDataSource(@Param("userId") int userId,@Param("dataSourceIds")T[] dataSourceIds);
<T> List<DataSource> listAuthorizedDataSource(@Param("userId") int userId, @Param("dataSourceIds") T[] dataSourceIds);
/**
* query datasource by name and user id
@ -108,5 +108,19 @@ public interface DataSourceMapper extends BaseMapper<DataSource> {
* @param searchVal
* @return
*/
IPage<DataSource> selectPagingByIds(Page<DataSource> dataSourcePage, @Param("dataSourceIds")List<Integer> dataSourceIds, @Param("name")String name);
IPage<DataSource> selectPagingByIds(Page<DataSource> dataSourcePage, @Param("dataSourceIds") List<Integer> dataSourceIds, @Param("name") String name);
/**
* clearBindTestId
* @param bindTestId
* @return
*/
void clearBindTestId(@Param("bindTestId") Integer bindTestId);
/**
* queryTestDataSourceId
* @param onlineDataSourceId
* @return Integer
*/
Integer queryTestDataSourceId(@Param("dataSourceId") Integer onlineDataSourceId);
}

10
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java

@ -187,11 +187,13 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
* @param definitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @param testFlag testFlag
* @return process instance
*/
ProcessInstance queryLastSchedulerProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);
@Param("endTime") Date endTime,
@Param("testFlag") int testFlag);
/**
* query last running process instance
@ -199,12 +201,14 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
* @param definitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @param testFlag testFlag
* @param stateArray stateArray
* @return process instance
*/
ProcessInstance queryLastRunningProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("testFlag") int testFlag,
@Param("states") int[] stateArray);
/**
@ -213,11 +217,13 @@ public interface ProcessInstanceMapper extends BaseMapper<ProcessInstance> {
* @param definitionCode definitionCode
* @param startTime startTime
* @param endTime endTime
* @param testFlag testFlag
* @return process instance
*/
ProcessInstance queryLastManualProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);
@Param("endTime") Date endTime,
@Param("testFlag") int testFlag);
/**
* query top n process instance order by running duration

3
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.java

@ -38,7 +38,8 @@ public interface TaskInstanceMapper extends BaseMapper<TaskInstance> {
@Param("state") Integer state);
List<TaskInstance> findValidTaskListByProcessId(@Param("processInstanceId") Integer processInstanceId,
@Param("flag") Flag flag);
@Param("flag") Flag flag,
@Param("testFlag") int testFlag);
List<TaskInstance> queryByHostAndStatus(@Param("host") String host,
@Param("states") int[] stateArray);

1
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java

@ -75,6 +75,7 @@ public class TaskInstanceUtils {
target.setCpuQuota(source.getCpuQuota());
target.setMemoryMax(source.getMemoryMax());
target.setTaskExecuteType(source.getTaskExecuteType());
target.setTestFlag(source.getTestFlag());
}
}

16
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/DataSourceMapper.xml

@ -26,6 +26,7 @@
<include refid="baseSql"/>
from t_ds_datasource
where type=#{type}
and test_flag=#{testFlag}
<if test="userId != 0">
and id in
(select datasource_id
@ -40,7 +41,7 @@
<select id="selectPaging" resultType="org.apache.dolphinscheduler.dao.entity.DataSource">
select
d.id, d.name, d.note, d.type, d.user_id, connection_params, d.create_time, d.update_time
d.id, d.name, d.note, d.type, d.user_id, connection_params, d.create_time, d.update_time, d.test_flag, d.bind_test_id
,
u.user_name as user_name
from t_ds_datasource d
@ -68,7 +69,7 @@
where name=#{name}
</select>
<select id="queryAuthedDatasource" resultType="org.apache.dolphinscheduler.dao.entity.DataSource">
select ds.id, ds.name, ds.note, ds.type, ds.user_id, ds.connection_params, ds.create_time, ds.update_time
select ds.id, ds.name, ds.note, ds.type, ds.user_id, ds.connection_params, ds.create_time, ds.update_time, ds.test_flag, ds.bind_test_id
from t_ds_datasource ds, t_ds_relation_datasource_user rel
where ds.id = rel.datasource_id AND rel.user_id = #{userId}
</select>
@ -115,7 +116,7 @@
<select id="selectPagingByIds" resultType="org.apache.dolphinscheduler.dao.entity.DataSource">
select
d.id, d.name, d.note, d.type, d.user_id, connection_params, d.create_time, d.update_time
d.id, d.name, d.note, d.type, d.user_id, connection_params, d.create_time, d.update_time, d.test_flag, d.bind_test_id
,
u.user_name as user_name
from t_ds_datasource d
@ -132,5 +133,12 @@
</if>
order by update_time desc
</select>
<select id="queryTestDataSourceId" resultType="java.lang.Integer">
select d.bind_test_id
from t_ds_datasource d
where d.id = #{dataSourceId}
</select>
<update id="clearBindTestId">
update t_ds_datasource d set bind_test_id=null where bind_test_id=#{bindTestId}
</update>
</mapper>

10
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml

@ -24,7 +24,7 @@
warning_group_id, schedule_time, command_start_time, global_params, flag,
update_time, is_sub_process, executor_id, history_cmd,
process_instance_priority, worker_group,environment_code, timeout, tenant_id, var_pool,
dry_run, next_process_instance_id, restart_time, state_history
dry_run, test_flag, next_process_instance_id, restart_time, state_history
</sql>
<select id="queryDetailById" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
@ -110,7 +110,7 @@
<select id="queryProcessInstanceListPaging" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select instance.id, instance.command_type, instance.executor_id, instance.process_definition_version,
instance.process_definition_code, instance.name, instance.state, instance.schedule_time, instance.start_time,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run ,instance.next_process_instance_id,
instance.end_time, instance.run_times, instance.recovery, instance.host, instance.dry_run , instance.test_flag ,instance.next_process_instance_id,
restart_time, instance.state_history
from t_ds_process_instance instance
join t_ds_process_definition define ON instance.process_definition_code = define.code
@ -200,7 +200,7 @@
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag}
<if test="startTime!=null and endTime != null ">
and schedule_time <![CDATA[ >= ]]> #{startTime} and schedule_time <![CDATA[ <= ]]> #{endTime}
</if>
@ -210,7 +210,7 @@
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag}
<if test="states !=null and states.length != 0">
and state in
<foreach collection="states" item="i" index="index" open="(" separator="," close=")">
@ -227,7 +227,7 @@
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode}
where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag}
and schedule_time is null
<if test="startTime!=null and endTime != null ">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}

5
dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml

@ -22,13 +22,13 @@
id, name, task_type, process_instance_id, task_code, task_definition_version, state, submit_time,
start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link,
flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id,
first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id, cpu_quota, memory_max, task_execute_type
first_submit_time, delay_time, task_params, var_pool, dry_run, test_flag, task_group_id, cpu_quota, memory_max, task_execute_type
</sql>
<sql id="baseSqlV2">
${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.process_instance_id, ${alias}.state, ${alias}.submit_time,
${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link,
${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id,
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.task_group_id, ${alias}.task_execute_type
${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.test_flag, ${alias}.task_group_id, ${alias}.task_execute_type
</sql>
<update id="setFailoverByHostAndStateArray">
update t_ds_task_instance
@ -54,6 +54,7 @@
from t_ds_task_instance
WHERE process_instance_id = #{processInstanceId}
and flag = #{flag}
and test_flag=#{testFlag}
order by start_time desc
</select>
<select id="queryByHostAndStatus" resultType="org.apache.dolphinscheduler.dao.entity.TaskInstance">

6
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -337,6 +337,7 @@ CREATE TABLE t_ds_command
dry_run int NULL DEFAULT 0,
process_instance_id int(11) DEFAULT 0,
process_definition_version int(11) DEFAULT 0,
test_flag int NULL DEFAULT 0,
PRIMARY KEY (id),
KEY priority_id_index (process_instance_priority, id)
);
@ -359,6 +360,8 @@ CREATE TABLE t_ds_datasource
connection_params text NOT NULL,
create_time datetime NOT NULL,
update_time datetime DEFAULT NULL,
test_flag int DEFAULT NULL,
bind_test_id int DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY t_ds_datasource_name_un (name, type)
);
@ -392,6 +395,7 @@ CREATE TABLE t_ds_error_command
dry_run int NULL DEFAULT 0,
process_instance_id int(11) DEFAULT 0,
process_definition_version int(11) DEFAULT 0,
test_flag int NULL DEFAULT 0,
PRIMARY KEY (id)
);
@ -620,6 +624,7 @@ CREATE TABLE t_ds_process_instance
var_pool longtext,
dry_run int NULL DEFAULT 0,
restart_time datetime DEFAULT NULL,
test_flag int NULL DEFAULT 0,
PRIMARY KEY (id)
);
@ -873,6 +878,7 @@ CREATE TABLE t_ds_task_instance
dry_run int NULL DEFAULT 0,
cpu_quota int(11) DEFAULT '-1' NOT NULL,
memory_max int(11) DEFAULT '-1' NOT NULL,
test_flag int NULL DEFAULT 0,
PRIMARY KEY (id)
);

6
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -343,6 +343,7 @@ CREATE TABLE `t_ds_command` (
`worker_group` varchar(64) COMMENT 'worker group',
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
`test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run',
PRIMARY KEY (`id`),
KEY `priority_id_index` (`process_instance_priority`,`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@ -364,6 +365,8 @@ CREATE TABLE `t_ds_datasource` (
`connection_params` text NOT NULL COMMENT 'json connection params',
`create_time` datetime NOT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
`test_flag` tinyint(4) DEFAULT NULL COMMENT 'test flag:0 normal, 1 testDataSource',
`bind_test_id` int(11) DEFAULT NULL COMMENT 'bind testDataSource id',
PRIMARY KEY (`id`),
UNIQUE KEY `t_ds_datasource_name_un` (`name`, `type`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
@ -396,6 +399,7 @@ CREATE TABLE `t_ds_error_command` (
`environment_code` bigint(20) DEFAULT '-1' COMMENT 'environment code',
`message` text COMMENT 'message',
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag: 0 normal, 1 dry run',
`test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
@ -622,6 +626,7 @@ CREATE TABLE `t_ds_process_instance` (
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
`next_process_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next processInstanceId',
`restart_time` datetime DEFAULT NULL COMMENT 'process instance restart time',
`test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run',
PRIMARY KEY (`id`),
KEY `process_instance_index` (`process_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`,`end_time`) USING BTREE
@ -868,6 +873,7 @@ CREATE TABLE `t_ds_task_instance` (
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag: 0 normal, 1 dry run',
`cpu_quota` int(11) DEFAULT '-1' NOT NULL COMMENT 'cpuQuota(%): -1:Infinity',
`memory_max` int(11) DEFAULT '-1' NOT NULL COMMENT 'MemoryMax(MB): -1:Infinity',
`test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run',
PRIMARY KEY (`id`),
KEY `process_instance_id` (`process_instance_id`) USING BTREE,
KEY `idx_code_version` (`task_code`, `task_definition_version`) USING BTREE

5
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -268,6 +268,7 @@ CREATE TABLE t_ds_command (
dry_run int DEFAULT '0' ,
process_instance_id int DEFAULT 0,
process_definition_version int DEFAULT 0,
test_flag int DEFAULT NULL ,
PRIMARY KEY (id)
) ;
@ -287,6 +288,8 @@ CREATE TABLE t_ds_datasource (
connection_params text NOT NULL ,
create_time timestamp NOT NULL ,
update_time timestamp DEFAULT NULL ,
test_flag int DEFAULT NULL ,
bind_test_id int DEFAULT NULL ,
PRIMARY KEY (id),
CONSTRAINT t_ds_datasource_name_un UNIQUE (name, type)
) ;
@ -550,6 +553,7 @@ CREATE TABLE t_ds_process_instance (
dry_run int DEFAULT '0' ,
next_process_instance_id int DEFAULT '0',
restart_time timestamp DEFAULT NULL ,
test_flag int DEFAULT NULL ,
PRIMARY KEY (id)
) ;
@ -773,6 +777,7 @@ CREATE TABLE t_ds_task_instance (
dry_run int DEFAULT '0' ,
cpu_quota int DEFAULT '-1' NOT NULL,
memory_max int DEFAULT '-1' NOT NULL,
test_flag int DEFAULT NULL ,
PRIMARY KEY (id)
) ;

122
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_ddl.sql

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
SET sql_mode=(SELECT REPLACE(@@sql_mode,'ONLY_FULL_GROUP_BY',''));
-- uc_dolphin_T_t_ds_command_R_test_flag
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_command_R_test_flag;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_command_R_test_flag()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='test_flag')
THEN
ALTER TABLE t_ds_command ADD `test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_command_R_test_flag;
DROP PROCEDURE uc_dolphin_T_t_ds_command_R_test_flag;
-- uc_dolphin_T_t_ds_error_command_R_test_flag
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_error_command_R_test_flag;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_error_command_R_test_flag()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_error_command'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='test_flag')
THEN
ALTER TABLE t_ds_error_command ADD `test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_error_command_R_test_flag;
DROP PROCEDURE uc_dolphin_T_t_ds_error_command_R_test_flag;
-- uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_datasource'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='test_flag')
and NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_datasource'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='bind_test_id')
THEN
ALTER TABLE t_ds_datasource ADD `test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 testDataSource';
ALTER TABLE t_ds_datasource ADD `bind_test_id` int DEFAULT null COMMENT 'bind testDataSource id';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id;
DROP PROCEDURE uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id;
-- uc_dolphin_T_t_ds_process_instance_R_test_flag
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_R_test_flag;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_R_test_flag()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_process_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='test_flag')
THEN
ALTER TABLE t_ds_process_instance ADD `test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_process_instance_R_test_flag;
DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_R_test_flag;
-- uc_dolphin_T_t_ds_task_instance_R_test_flag
drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_R_test_flag;
delimiter d//
CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_R_test_flag()
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_task_instance'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='test_flag')
THEN
ALTER TABLE t_ds_task_instance ADD `test_flag` tinyint(4) DEFAULT null COMMENT 'test flag:0 normal, 1 test run';
END IF;
END;
d//
delimiter ;
CALL uc_dolphin_T_t_ds_task_instance_R_test_flag;
DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_R_test_flag;

16
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_dml.sql

@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

121
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- uc_dolphin_T_t_ds_command_R_test_flag
delimiter ;
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_command_R_test_flag();
delimiter d//
CREATE FUNCTION uc_dolphin_T_t_ds_command_R_test_flag() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_command'
AND COLUMN_NAME ='test_flag')
THEN
ALTER TABLE t_ds_command alter column test_flag type int DEFAULT NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select uc_dolphin_T_t_ds_command_R_test_flag();
DROP FUNCTION uc_dolphin_T_t_ds_command_R_test_flag();
-- uc_dolphin_T_t_ds_error_command_R_test_flag
delimiter ;
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_error_command_R_test_flag();
delimiter d//
CREATE FUNCTION uc_dolphin_T_t_ds_error_command_R_test_flag() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_error_command'
AND COLUMN_NAME ='test_flag')
THEN
ALTER TABLE t_ds_error_command alter column test_flag type int DEFAULT NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select uc_dolphin_T_t_ds_error_command_R_test_flag();
DROP FUNCTION uc_dolphin_T_t_ds_error_command_R_test_flag();
-- uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id
delimiter ;
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id();
delimiter d//
CREATE FUNCTION uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_datasource'
AND COLUMN_NAME ='test_flag')
THEN
ALTER TABLE t_ds_datasource alter column test_flag type int DEFAULT NULL;
ALTER TABLE t_ds_datasource alter column bind_test_id type int DEFAULT NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id();
DROP FUNCTION uc_dolphin_T_t_ds_datasource_R_test_flag_bind_test_id();
-- uc_dolphin_T_t_ds_process_instance_R_test_flag
delimiter ;
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_instance_R_test_flag();
delimiter d//
CREATE FUNCTION uc_dolphin_T_t_ds_process_instance_R_test_flag() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_process_instance'
AND COLUMN_NAME ='test_flag')
THEN
ALTER TABLE t_ds_process_instance alter column test_flag type int DEFAULT NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select uc_dolphin_T_t_ds_process_instance_R_test_flag();
DROP FUNCTION uc_dolphin_T_t_ds_process_instance_R_test_flag();
-- uc_dolphin_T_t_ds_task_instance_R_test_flag
delimiter ;
DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_R_test_flag();
delimiter d//
CREATE FUNCTION uc_dolphin_T_t_ds_task_instance_R_test_flag() RETURNS void AS $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_CATALOG=current_database()
AND TABLE_SCHEMA=current_schema()
AND TABLE_NAME='t_ds_task_instance'
AND COLUMN_NAME ='test_flag')
THEN
ALTER TABLE t_ds_task_instance alter column test_flag type int DEFAULT NULL;
END IF;
END;
$$ LANGUAGE plpgsql;
d//
delimiter ;
select uc_dolphin_T_t_ds_task_instance_R_test_flag();
DROP FUNCTION uc_dolphin_T_t_ds_task_instance_R_test_flag();

29
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_dml.sql

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
delimiter d//
return 'Success!';
exception when others then
---Raise EXCEPTION '(%)',SQLERRM;
return SQLERRM;
END;
$BODY$;
select dolphin_insert_dq_initial_data();
d//

4
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/DataSourceMapperTest.java

@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.BaseDaoTest;
@ -130,7 +131,7 @@ public class DataSourceMapperTest extends BaseDaoTest {
Map<Integer, DataSource> datasourceMap = createDataSourceMap(userId, "test");
List<DataSource> actualDataSources = dataSourceMapper.queryDataSourceByType(
0, DbType.MYSQL.ordinal());
0, DbType.MYSQL.ordinal(), Constants.TEST_FLAG_NO);
assertThat(actualDataSources.size(), greaterThanOrEqualTo(2));
@ -352,6 +353,7 @@ public class DataSourceMapperTest extends BaseDaoTest {
dataSource.setType(DbType.MYSQL);
dataSource.setNote("mysql test");
dataSource.setConnectionParams("hello mysql");
dataSource.setTestFlag(Constants.TEST_FLAG_NO);
dataSource.setUpdateTime(DateUtils.getCurrentDate());
dataSource.setCreateTime(DateUtils.getCurrentDate());

12
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java

@ -78,7 +78,7 @@ public class ProcessInstanceMapperTest extends BaseDaoTest {
processInstance.setStartTime(start);
processInstance.setEndTime(end);
processInstance.setState(WorkflowExecutionStatus.SUBMITTED_SUCCESS);
processInstance.setTestFlag(0);
processInstanceMapper.insert(processInstance);
return processInstance;
}
@ -305,7 +305,7 @@ public class ProcessInstanceMapperTest extends BaseDaoTest {
processInstanceMapper.updateById(processInstance);
ProcessInstance processInstance1 =
processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), null, null);
processInstanceMapper.queryLastSchedulerProcess(processInstance.getProcessDefinitionCode(), null, null, processInstance.getTestFlag());
Assert.assertNotEquals(processInstance1, null);
processInstanceMapper.deleteById(processInstance.getId());
}
@ -324,7 +324,7 @@ public class ProcessInstanceMapperTest extends BaseDaoTest {
WorkflowExecutionStatus.SUBMITTED_SUCCESS.ordinal()};
ProcessInstance processInstance1 = processInstanceMapper
.queryLastRunningProcess(processInstance.getProcessDefinitionCode(), null, null, stateArray);
.queryLastRunningProcess(processInstance.getProcessDefinitionCode(), null, null, processInstance.getTestFlag(), stateArray);
Assert.assertNotEquals(processInstance1, null);
processInstanceMapper.deleteById(processInstance.getId());
@ -341,12 +341,12 @@ public class ProcessInstanceMapperTest extends BaseDaoTest {
Date start = new Date(2019 - 1900, 1 - 1, 01, 0, 0, 0);
Date end = new Date(2019 - 1900, 1 - 1, 01, 5, 0, 0);
ProcessInstance processInstance1 =
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end);
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end, processInstance.getTestFlag());
Assert.assertEquals(processInstance1.getId(), processInstance.getId());
start = new Date(2019 - 1900, 1 - 1, 01, 1, 0, 0);
processInstance1 =
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end);
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end, processInstance.getTestFlag());
Assert.assertNull(processInstance1);
processInstanceMapper.deleteById(processInstance.getId());
@ -396,4 +396,4 @@ public class ProcessInstanceMapperTest extends BaseDaoTest {
processInstanceMapper.deleteById(processInstance3.getId());
}
}
}

8
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java

@ -72,6 +72,7 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
processInstance.setStartTime(new Date());
processInstance.setEndTime(new Date());
processInstance.setProcessDefinitionCode(1L);
processInstance.setTestFlag(0);
processInstanceMapper.insert(processInstance);
return processInstanceMapper.queryByProcessDefineCode(1L, 1).get(0);
}
@ -176,13 +177,14 @@ public class TaskInstanceMapperTest extends BaseDaoTest {
List<TaskInstance> taskInstances = taskInstanceMapper.findValidTaskListByProcessId(
task.getProcessInstanceId(),
Flag.YES);
Flag.YES,
processInstance.getTestFlag());
task2.setFlag(Flag.NO);
taskInstanceMapper.updateById(task2);
List<TaskInstance> taskInstances1 = taskInstanceMapper.findValidTaskListByProcessId(task.getProcessInstanceId(),
Flag.NO);
Flag.NO,
processInstance.getTestFlag());
taskInstanceMapper.deleteById(task2.getId());
taskInstanceMapper.deleteById(task.getId());
Assert.assertNotEquals(taskInstances.size(), 0);

20
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/datasource/BaseDataSourceParamDTO.java

@ -43,6 +43,10 @@ public abstract class BaseDataSourceParamDTO implements Serializable {
protected String password;
protected int testFlag;
protected Integer bindTestId;
protected Map<String, String> other;
public Integer getId() {
@ -117,6 +121,22 @@ public abstract class BaseDataSourceParamDTO implements Serializable {
this.other = other;
}
public int getTestFlag() {
return testFlag;
}
public void setTestFlag(int testFlag) {
this.testFlag = testFlag;
}
public Integer getBindTestId() {
return bindTestId;
}
public void setBindTestId(Integer bindTestId) {
this.bindTestId = bindTestId;
}
/**
* Get the datasource type
* see{@link DbType}

1
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java

@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import static java.lang.String.format;

4
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/ClickhouseDataSourceE2ETest.java

@ -65,6 +65,8 @@ public class ClickhouseDataSourceE2ETest {
private static final String jdbcParams = "";
private static final int testFlag = 1;
@BeforeAll
public static void setup() {
@ -78,7 +80,7 @@ public class ClickhouseDataSourceE2ETest {
void testCreateClickhouseDataSource() {
final DataSourcePage page = new DataSourcePage(browser);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, pgPassword, database, jdbcParams);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, pgPassword, database, jdbcParams, testFlag);
new WebDriverWait(page.driver(), 10).until(ExpectedConditions.invisibilityOfElementLocated(
new By.ByClassName("dialog-create-data-source")));

3
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/HiveDataSourceE2ETest.java

@ -64,6 +64,7 @@ public class HiveDataSourceE2ETest {
private static final String jdbcParams = "";
private static final int testFlag = 1;
@BeforeAll
public static void setup() {
@ -77,7 +78,7 @@ public class HiveDataSourceE2ETest {
void testCreateHiveDataSource() {
final DataSourcePage page = new DataSourcePage(browser);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, hivePassword, database, jdbcParams);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, hivePassword, database, jdbcParams, testFlag);
new WebDriverWait(page.driver(), 10).until(ExpectedConditions.invisibilityOfElementLocated(
new By.ByClassName("dialog-create-data-source")));

3
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/MysqlDataSourceE2ETest.java

@ -66,6 +66,7 @@ public class MysqlDataSourceE2ETest {
private static final String jdbcParams = "{\"useSSL\": false}";
private static final int testFlag = 1;
@BeforeAll
public static void setup() {
@ -79,7 +80,7 @@ public class MysqlDataSourceE2ETest {
void testCreateMysqlDataSource() {
final DataSourcePage page = new DataSourcePage(browser);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, mysqlPassword, database, jdbcParams);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, mysqlPassword, database, jdbcParams, testFlag);
new WebDriverWait(page.driver(), 10).until(ExpectedConditions.invisibilityOfElementLocated(
new By.ByClassName("dialog-create-data-source")));

3
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/PostgresDataSourceE2ETest.java

@ -65,6 +65,7 @@ public class PostgresDataSourceE2ETest {
private static final String jdbcParams = "";
private static final int testFlag = 1;
@BeforeAll
public static void setup() {
@ -78,7 +79,7 @@ public class PostgresDataSourceE2ETest {
void testCreatePostgresDataSource() {
final DataSourcePage page = new DataSourcePage(browser);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, pgPassword, database, jdbcParams);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, pgPassword, database, jdbcParams, testFlag);
new WebDriverWait(page.driver(), 10).until(ExpectedConditions.invisibilityOfElementLocated(
new By.ByClassName("dialog-create-data-source")));

3
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/cases/SqlServerDataSourceE2ETest.java

@ -65,6 +65,7 @@ public class SqlServerDataSourceE2ETest {
private static final String jdbcParams = "";
private static final int testFlag = 1;
@BeforeAll
public static void setup() {
@ -78,7 +79,7 @@ public class SqlServerDataSourceE2ETest {
void testCreateSqlServerDataSource() {
final DataSourcePage page = new DataSourcePage(browser);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, pgPassword, database, jdbcParams);
page.createDataSource(dataSourceType, dataSourceName, dataSourceDescription, ip, port, userName, pgPassword, database, jdbcParams, testFlag);
new WebDriverWait(page.driver(), 10).until(ExpectedConditions.invisibilityOfElementLocated(
new By.ByClassName("dialog-create-data-source")));

16
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/java/org/apache/dolphinscheduler/e2e/pages/datasource/DataSourcePage.java

@ -65,7 +65,7 @@ public class DataSourcePage extends NavBarPage implements NavBarPage.NavBarItem
}
public DataSourcePage createDataSource(String dataSourceType, String dataSourceName, String dataSourceDescription, String ip, String port, String userName, String password, String database,
String jdbcParams) {
String jdbcParams, int testFlag) {
buttonCreateDataSource().click();
new WebDriverWait(driver, 10).until(ExpectedConditions.visibilityOfElementLocated(
@ -91,6 +91,8 @@ public class DataSourcePage extends NavBarPage implements NavBarPage.NavBarItem
createDataSourceForm().inputUserName().sendKeys(userName);
createDataSourceForm().inputPassword().sendKeys(password);
createDataSourceForm().inputDataBase().sendKeys(database);
createDataSourceForm().radioTestDatasource().click();
if (!"".equals(jdbcParams)) {
createDataSourceForm().inputJdbcParams().sendKeys(jdbcParams);
@ -179,6 +181,15 @@ public class DataSourcePage extends NavBarPage implements NavBarPage.NavBarItem
})
private WebElement inputJdbcParams;
@FindBy(className = "radio-test-datasource")
private WebElement radioTestDatasource;
@FindBy(className = "radio-online-datasource")
private WebElement radioOnlineDatasource;
@FindBy(className = "select-bind-test-data-source-type-drop-down")
private WebElement selectBindTestDataSourceId;
@FindBy(className = "btn-submit")
private WebElement buttonSubmit;
@ -186,6 +197,7 @@ public class DataSourcePage extends NavBarPage implements NavBarPage.NavBarItem
private WebElement buttonCancel;
@FindBy(className = "btn-test-connection")
private WebElement btnTestConnection;
private WebElement radioTestConnection;
}
}

1
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.server.master.rpc.MasterRPCServer;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerBootstrap;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.quartz.SchedulerException;

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java

@ -26,19 +26,26 @@ import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* TaskExecutionContext builder
*/
public class TaskExecutionContextBuilder {
protected final Logger logger =
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
public static TaskExecutionContextBuilder get() {
return new TaskExecutionContextBuilder();
}
@ -65,6 +72,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setDelayTime(taskInstance.getDelayTime());
taskExecutionContext.setVarPool(taskInstance.getVarPool());
taskExecutionContext.setDryRun(taskInstance.getDryRun());
taskExecutionContext.setTestFlag(taskInstance.getTestFlag());
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUBMITTED_SUCCESS);
taskExecutionContext.setCpuQuota(taskInstance.getCpuQuota());
taskExecutionContext.setMemoryMax(taskInstance.getMemoryMax());

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -658,6 +658,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
command.setDryRun(processInstance.getDryRun());
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(processInstance.getProcessDefinitionVersion());
command.setTestFlag(processInstance.getTestFlag());
return processService.createCommand(command);
}
@ -814,7 +815,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
processInstance.getRunTimes(),
processInstance.getRecovery());
List<TaskInstance> validTaskInstanceList =
processService.findValidTaskListByProcessId(processInstance.getId());
processService.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
for (TaskInstance task : validTaskInstanceList) {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(task.getProcessInstanceId(), task.getId());
@ -1142,6 +1143,9 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatue> {
// task instance start time
taskInstance.setStartTime(null);
// task test flag
taskInstance.setTestFlag(processInstance.getTestFlag());
// task instance flag
taskInstance.setFlag(Flag.YES);

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BlockingTaskProcessor.java

@ -172,7 +172,7 @@ public class BlockingTaskProcessor extends BaseTaskProcessor {
private void setConditionResult() {
List<TaskInstance> taskInstances = processService
.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
}

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -17,9 +17,10 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
@ -33,7 +34,12 @@ import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.apache.commons.lang3.StringUtils;
import java.util.Date;
import java.util.Map;
import com.google.auto.service.AutoService;
/**
* common task processor
@ -47,6 +53,10 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
@Override
protected boolean submitTask() {
if (this.taskInstance.getTestFlag() == Constants.TEST_FLAG_YES) {
convertExeEnvironmentOnlineToTest();
}
this.taskInstance =
processService.submitTaskWithRetry(processInstance, taskInstance, maxRetryTimes, commitInterval);
@ -171,4 +181,26 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
nettyExecutorManager.executeDirectly(executionContext);
}
protected void convertExeEnvironmentOnlineToTest() {
//SQL taskType
if (TaskConstants.TASK_TYPE_SQL.equals(taskInstance.getTaskType())) {
//replace test data source
Map<String, Object> taskDefinitionParams = JSONUtils.parseObject(taskInstance.getTaskDefine().getTaskParams(), new TypeReference<Map<String, Object>>() {
});
Map<String, Object> taskInstanceParams = JSONUtils.parseObject(taskInstance.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
Integer onlineDataSourceId = (Integer) taskDefinitionParams.get(Constants.DATASOUCE);
Integer testDataSourceId = processService.queryTestDataSourceId(onlineDataSourceId);
taskDefinitionParams.put(Constants.DATASOUCE, testDataSourceId);
taskInstanceParams.put(Constants.DATASOUCE, testDataSourceId);
taskInstance.getTaskDefine().setTaskParams(JSONUtils.toJsonString(taskDefinitionParams));
taskInstance.setTaskParams(JSONUtils.toJsonString(taskInstanceParams));
if (null == testDataSourceId) {
logger.warn("task name :{}, test data source replacement failed", taskInstance.getName());
} else {
logger.info("task name :{}, test data source replacement succeeded", taskInstance.getName());
}
}
}
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java

@ -144,8 +144,8 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
private void setConditionResult() {
List<TaskInstance> taskInstances =
processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
List<TaskInstance> taskInstances
= processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getTaskCode(), task.getState());
}

10
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java

@ -85,6 +85,11 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
DependResult result;
/**
* test flag
*/
private int testFlag;
boolean allDependentItemFinished;
@Override
@ -162,6 +167,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
} else {
this.dependentDate = new Date();
}
this.testFlag = processInstance.getTestFlag();
// check dependent project is exist
List<DependentTaskModel> dependTaskList = dependentParameters.getDependTaskList();
Set<Long> projectCodes = new HashSet<>();
@ -240,7 +246,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
logger.info("dependent item complete, dependentKey: {}, result: {}, dependentDate: {}", entry.getKey(), entry.getValue(), dependentDate);
}
}
if (!dependentExecute.finish(dependentDate)) {
if (!dependentExecute.finish(dependentDate, testFlag)) {
finish = false;
}
}
@ -255,7 +261,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
private DependResult getTaskDependResult() {
List<DependResult> dependResultList = new ArrayList<>();
for (DependentExecute dependentExecute : dependentTaskList) {
DependResult dependResult = dependentExecute.getModelDependResult(dependentDate);
DependResult dependResult = dependentExecute.getModelDependResult(dependentDate, testFlag);
dependResultList.add(dependResult);
}
result = DependentUtils.getDependResultForRelation(this.dependentParameters.getRelation(), dependResultList);

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java

@ -126,7 +126,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
private boolean setSwitchResult() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId());
taskInstance.getProcessInstanceId(), processInstance.getTestFlag());
Map<String, TaskExecutionStatus> completeTaskList = new HashMap<>();
for (TaskInstance task : taskInstances) {
completeTaskList.putIfAbsent(task.getName(), task.getState());

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -163,7 +163,7 @@ public class MasterFailoverService {
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId, processInstance.getTestFlag());
for (TaskInstance taskInstance : taskInstanceList) {
try {
LoggerUtils.setTaskInstanceIdMDC(taskInstance.getId());

33
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/DependentExecute.java

@ -91,10 +91,10 @@ public class DependentExecute {
* @param currentTime current time
* @return DependResult
*/
private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime) {
private DependResult getDependentResultForItem(DependentItem dependentItem, Date currentTime, int testFlag) {
List<DateInterval> dateIntervals =
DependentUtils.getDateIntervalList(currentTime, dependentItem.getDateValue());
return calculateResultForTasks(dependentItem, dateIntervals);
return calculateResultForTasks(dependentItem, dateIntervals, testFlag);
}
/**
@ -105,12 +105,13 @@ public class DependentExecute {
* @return dateIntervals
*/
private DependResult calculateResultForTasks(DependentItem dependentItem,
List<DateInterval> dateIntervals) {
List<DateInterval> dateIntervals,
int testFlag) {
DependResult result = DependResult.FAILED;
for (DateInterval dateInterval : dateIntervals) {
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
dateInterval);
dateInterval, testFlag);
if (processInstance == null) {
return DependResult.WAITING;
}
@ -118,7 +119,7 @@ public class DependentExecute {
if (dependentItem.getDepTaskCode() == Constants.DEPENDENT_ALL_TASK_CODE) {
result = dependResultByProcessInstance(processInstance);
} else {
result = getDependTaskResult(dependentItem.getDepTaskCode(), processInstance);
result = getDependTaskResult(dependentItem.getDepTaskCode(), processInstance, testFlag);
}
if (result != DependResult.SUCCESS) {
break;
@ -149,10 +150,10 @@ public class DependentExecute {
* @param processInstance
* @return
*/
private DependResult getDependTaskResult(long taskCode, ProcessInstance processInstance) {
private DependResult getDependTaskResult(long taskCode, ProcessInstance processInstance, int testFlag) {
DependResult result;
TaskInstance taskInstance = null;
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId(), testFlag);
for (TaskInstance task : taskInstanceList) {
if (task.getTaskCode() == taskCode) {
@ -185,12 +186,12 @@ public class DependentExecute {
* @param dateInterval date interval
* @return ProcessInstance
*/
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval) {
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
ProcessInstance lastSchedulerProcess =
processService.findLastSchedulerProcessInterval(definitionCode, dateInterval);
processService.findLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);
ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(definitionCode, dateInterval);
ProcessInstance lastManualProcess = processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
if (lastManualProcess == null) {
return lastSchedulerProcess;
@ -226,9 +227,9 @@ public class DependentExecute {
* @param currentTime current time
* @return boolean
*/
public boolean finish(Date currentTime) {
public boolean finish(Date currentTime, int testFlag) {
if (modelDependResult == DependResult.WAITING) {
modelDependResult = getModelDependResult(currentTime);
modelDependResult = getModelDependResult(currentTime, testFlag);
return false;
}
return true;
@ -240,12 +241,12 @@ public class DependentExecute {
* @param currentTime current time
* @return DependResult
*/
public DependResult getModelDependResult(Date currentTime) {
public DependResult getModelDependResult(Date currentTime, int testFlag) {
List<DependResult> dependResultList = new ArrayList<>();
for (DependentItem dependentItem : dependItemList) {
DependResult dependResult = getDependResultForItem(dependentItem, currentTime);
DependResult dependResult = getDependResultForItem(dependentItem, currentTime, testFlag);
if (dependResult != DependResult.WAITING) {
dependResultMap.put(dependentItem.getKey(), dependResult);
}
@ -262,12 +263,12 @@ public class DependentExecute {
* @param currentTime current time
* @return DependResult
*/
private DependResult getDependResultForItem(DependentItem item, Date currentTime) {
private DependResult getDependResultForItem(DependentItem item, Date currentTime, int testFlag) {
String key = item.getKey();
if (dependResultMap.containsKey(key)) {
return dependResultMap.get(key);
}
return getDependentResultForItem(item, currentTime);
return getDependentResultForItem(item, currentTime, testFlag);
}
public Map<String, DependResult> getDependResultMap() {

3
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java

@ -200,7 +200,8 @@ public class BlockingTaskTest {
// for BlockingTaskExecThread.waitTaskQuit
List<TaskInstance> conditions = getTaskInstanceForValidTaskList(expectResults);
Mockito.when(processService.findValidTaskListByProcessId(processInstance.getId()))
Mockito.when(processService.
findValidTaskListByProcessId(processInstance.getId(),processInstance.getTestFlag()))
.thenReturn(conditions);
return taskInstance;
}

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/ConditionsTaskTest.java

@ -110,7 +110,7 @@ public class ConditionsTaskTest {
List<TaskInstance> conditions = Stream.of(
getTaskInstanceForValidTaskList(expectResult)).collect(Collectors.toList());
Mockito.when(processService
.findValidTaskListByProcessId(processInstance.getId()))
.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
.thenReturn(conditions);
return taskInstance;
}

25
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/DependentTaskTest.java

@ -147,12 +147,12 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.findValidTaskListByProcessId(200, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_A,
dependentProcessInstance),
@ -169,12 +169,12 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(dependentProcessInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.findValidTaskListByProcessId(200, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A,
dependentProcessInstance),
@ -217,21 +217,21 @@ public class DependentTaskTest {
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(processInstance200);
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(3L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(processInstance300);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.findValidTaskListByProcessId(200, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(2000, TaskExecutionStatus.FAILURE, DEPEND_TASK_CODE_A,
processInstance200))
.collect(Collectors.toList()));
Mockito.when(processService
.findValidTaskListByProcessId(300))
.findValidTaskListByProcessId(300, 0))
.thenReturn(Stream.of(
getTaskInstanceForValidTaskList(3000, TaskExecutionStatus.SUCCESS, DEPEND_TASK_CODE_B,
processInstance300),
@ -270,7 +270,7 @@ public class DependentTaskTest {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.SUCCESS));
// DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
@ -283,7 +283,7 @@ public class DependentTaskTest {
testDependentOnAllInit();
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.FAILURE));
// DependentTaskExecThread dependentTask = new DependentTaskExecThread(taskInstance);
@ -320,14 +320,14 @@ public class DependentTaskTest {
getProcessInstanceForFindLastRunningProcess(200, WorkflowExecutionStatus.RUNNING_EXECUTION);
// for DependentExecute.findLastProcessInterval
Mockito.when(processService
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any()))
.findLastRunningProcess(Mockito.eq(2L), Mockito.any(), Mockito.any(), Mockito.anyInt()))
.thenReturn(dependentProcessInstance);
// DependentTaskExecThread taskExecThread = new DependentTaskExecThread(taskInstance);
// for DependentExecute.getDependTaskResult
Mockito.when(processService
.findValidTaskListByProcessId(200))
.findValidTaskListByProcessId(200, 0))
.thenAnswer(i -> {
processInstance.setState(WorkflowExecutionStatus.READY_STOP);
return Stream.of(
@ -410,6 +410,7 @@ public class DependentTaskTest {
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(processInstanceId);
processInstance.setState(state);
processInstance.setTestFlag(0);
return processInstance;
}

84
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java

@ -17,17 +17,16 @@
package org.apache.dolphinscheduler.server.master.runner.task;
import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
@ -37,22 +36,43 @@ import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.mockito.junit.MockitoJUnitRunner;
import org.springframework.context.ApplicationContext;
@RunWith(SpringJUnit4ClassRunner.class)
@Ignore
@RunWith(MockitoJUnitRunner.Silent.class)
public class CommonTaskProcessorTest {
@Autowired
private ProcessService processService;
private CommonTaskProcessor commonTaskProcessor;
@Autowired
private ProcessService processService;
@Before
public void setUp() {
// init spring context
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
// mock process service
processService = Mockito.mock(ProcessService.class);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
commonTaskProcessor = Mockito.mock(CommonTaskProcessor.class);
Mockito.when(applicationContext.getBean(CommonTaskProcessor.class)).thenReturn(commonTaskProcessor);
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskDefinition.setTimeoutNotifyStrategy(TaskTimeoutStrategy.WARN);
taskDefinition.setTimeout(0);
Mockito.when(processService.findTaskDefinition(1L, 1))
.thenReturn(taskDefinition);
}
@Test
public void testGetTaskExecutionContext() throws Exception {
@ -81,11 +101,9 @@ public class CommonTaskProcessorTest {
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTimeoutFlag(TimeoutFlag.OPEN);
taskInstance.setTaskDefine(taskDefinition);
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskExecutionContext taskExecutionContext = commonTaskProcessor.getTaskExecutionContext(taskInstance);
Assert.assertNotNull(taskExecutionContext);
Assert.assertNull(taskExecutionContext);
}
@Test
@ -99,17 +117,15 @@ public class CommonTaskProcessorTest {
taskInstance.setWorkerGroup("NoWorkGroup");
taskInstance.setExecutorId(2);
// task node
commonTaskProcessor = Mockito.mock(CommonTaskProcessor.class);
Map<String, String> map = commonTaskProcessor.getResourceFullNames(taskInstance);
List<Resource> resourcesList = new ArrayList<Resource>();
List<Resource> resourcesList = new ArrayList<>();
Resource resource = new Resource();
resource.setFileName("fileName");
resourcesList.add(resource);
Mockito.doReturn(resourcesList).when(processService).listResourceByIds(new Integer[]{123});
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(),
ResourceType.FILE);
Mockito.doReturn("tenantCode").when(processService).queryTenantCodeByResName(resource.getFullName(), ResourceType.FILE);
Assert.assertNotNull(map);
}
@ -128,7 +144,7 @@ public class CommonTaskProcessorTest {
taskInstance.setProcessInstance(processInstance);
boolean res = commonTaskProcessor.verifyTenantIsNull(tenant, taskInstance);
Assert.assertTrue(res);
Assert.assertFalse(res);
tenant = new Tenant();
tenant.setId(1);
@ -142,4 +158,26 @@ public class CommonTaskProcessorTest {
}
@Test
public void testReplaceTestDatSource() {
CommonTaskProcessor commonTaskProcessor1 = new CommonTaskProcessor();
commonTaskProcessor1.processService = processService;
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTestFlag(1);
taskInstance.setTaskType("SQL");
taskInstance.setTaskParams("{\"localParams\":[],\"resourceList\":[],\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from 'order'\",\"sqlType\":\"0\",\"preStatements\":[],\"postStatements\":[],\"segmentSeparator\":\"\",\"displayRows\":10}");
TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setTaskParams("{\"localParams\":[],\"resourceList\":[],\"type\":\"MYSQL\",\"datasource\":1,\"sql\":\"select * from 'order'\",\"sqlType\":\"0\",\"preStatements\":[],\"postStatements\":[],\"segmentSeparator\":\"\",\"displayRows\":10}");
taskInstance.setTaskDefine(taskDefinition);
commonTaskProcessor1.taskInstance = taskInstance;
//The data source instance has no bound test data source
Mockito.when(processService.queryTestDataSourceId(any(Integer.class))).thenReturn(null);
commonTaskProcessor1.convertExeEnvironmentOnlineToTest();
//The data source instance has bound test data source
Mockito.when(processService.queryTestDataSourceId(any(Integer.class))).thenReturn(2);
commonTaskProcessor1.convertExeEnvironmentOnlineToTest();
// Assert.assertTrue(result);
}
}

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java

@ -163,7 +163,7 @@ public class FailoverServiceTest {
given(processService.queryNeedFailoverProcessInstances(Mockito.anyString()))
.willReturn(Arrays.asList(processInstance));
doNothing().when(processService).processNeedFailoverProcessInstances(Mockito.any(ProcessInstance.class));
given(processService.findValidTaskListByProcessId(Mockito.anyInt()))
given(processService.findValidTaskListByProcessId(Mockito.anyInt(), Mockito.anyInt()))
.willReturn(Lists.newArrayList(masterTaskInstance, workerTaskInstance));
given(processService.findProcessInstanceDetailById(Mockito.anyInt()))
.willReturn(Optional.ofNullable(processInstance));

10
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -145,7 +145,7 @@ public interface ProcessService {
List<Integer> findTaskIdByInstanceState(int instanceId, TaskExecutionStatus state);
List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId);
List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag);
List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId);
@ -190,11 +190,11 @@ public interface ProcessService {
List<Schedule> selectAllByProcessDefineCode(long[] codes);
ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval);
ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval);
ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime);
ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag);
String queryUserQueueByProcessInstance(ProcessInstance processInstance);
@ -298,4 +298,6 @@ public interface ProcessService {
public String findConfigYamlByName(String clusterName);
void forceProcessInstanceSuccessByTaskInstanceId(Integer taskInstanceId);
Integer queryTestDataSourceId(Integer onlineDataSourceId);
}

52
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -305,6 +305,7 @@ public class ProcessServiceImpl implements ProcessService {
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
processInstance.setTestFlag(command.getTestFlag());
// if the processDefinition is serial
ProcessDefinition processDefinition = this.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
@ -600,7 +601,8 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public void removeTaskLogFile(Integer processInstanceId) {
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId,processInstance.getTestFlag());
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@ -620,7 +622,8 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId,processInstance.getTestFlag());
if (CollectionUtils.isEmpty(taskInstanceList)) {
return;
}
@ -696,7 +699,8 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.getProcessInstancePriority(),
processInstance.getDryRun(),
processInstance.getId(),
processInstance.getProcessDefinitionVersion());
processInstance.getProcessDefinitionVersion(),
processInstance.getTestFlag());
saveCommand(command);
return;
}
@ -776,6 +780,7 @@ public class ProcessServiceImpl implements ProcessService {
Integer warningGroupId = command.getWarningGroupId() == null ? 0 : command.getWarningGroupId();
processInstance.setWarningGroupId(warningGroupId);
processInstance.setDryRun(command.getDryRun());
processInstance.setTestFlag(command.getTestFlag());
if (command.getScheduleTime() != null) {
processInstance.setScheduleTime(command.getScheduleTime());
@ -1032,7 +1037,7 @@ public class ProcessServiceImpl implements ProcessService {
case COMPLEMENT_DATA:
// delete all the valid tasks when complement data if id is not null
if (processInstance.getId() != null) {
List<TaskInstance> taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId());
List<TaskInstance> taskInstanceList = this.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
for (TaskInstance taskInstance : taskInstanceList) {
taskInstance.setFlag(Flag.NO);
this.updateTaskInstance(taskInstance);
@ -1046,7 +1051,7 @@ public class ProcessServiceImpl implements ProcessService {
processInstance.setCommandParam(JSONUtils.toJsonString(cmdParam));
}
// delete all the valid tasks when repeat running
List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId(),processInstance.getTestFlag());
for (TaskInstance taskInstance : validTaskList) {
taskInstance.setFlag(Flag.NO);
updateTaskInstance(taskInstance);
@ -1520,7 +1525,8 @@ public class ProcessServiceImpl implements ProcessService {
parentProcessInstance.getProcessInstancePriority(),
parentProcessInstance.getDryRun(),
subProcessInstanceId,
subProcessDefinition.getVersion());
subProcessDefinition.getVersion(),
parentProcessInstance.getTestFlag());
}
/**
@ -1648,7 +1654,7 @@ public class ProcessServiceImpl implements ProcessService {
if (failureStrategy == FailureStrategy.CONTINUE) {
return true;
}
List<TaskInstance> taskInstances = this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
List<TaskInstance> taskInstances = this.findValidTaskListByProcessId(taskInstance.getProcessInstanceId(),taskInstance.getTestFlag());
for (TaskInstance task : taskInstances) {
if (task.getState() == TaskExecutionStatus.FAILURE
@ -1751,6 +1757,7 @@ public class ProcessServiceImpl implements ProcessService {
taskInstance.getTaskDefinitionVersion());
this.updateTaskDefinitionResources(taskDefinition);
taskInstance.setTaskDefine(taskDefinition);
taskInstance.setTestFlag(processInstance.getTestFlag());
}
/**
@ -1838,11 +1845,12 @@ public class ProcessServiceImpl implements ProcessService {
* find valid task list by process definition id
*
* @param processInstanceId processInstanceId
* @param testFlag testFlag
* @return task instance list
*/
@Override
public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId) {
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES);
public List<TaskInstance> findValidTaskListByProcessId(Integer processInstanceId, int testFlag) {
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.YES, testFlag);
}
/**
@ -1853,7 +1861,8 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public List<TaskInstance> findPreviousTaskListByWorkProcessId(Integer processInstanceId) {
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO);
ProcessInstance processInstance = processInstanceMapper.selectById(processInstanceId);
return taskInstanceMapper.findValidTaskListByProcessId(processInstanceId, Flag.NO, processInstance.getTestFlag());
}
/**
@ -2085,6 +2094,7 @@ public class ProcessServiceImpl implements ProcessService {
cmd.setExecutorId(processInstance.getExecutorId());
cmd.setCommandType(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS);
cmd.setProcessInstancePriority(processInstance.getProcessInstancePriority());
cmd.setTestFlag(processInstance.getTestFlag());
createCommand(cmd);
}
@ -2184,10 +2194,11 @@ public class ProcessServiceImpl implements ProcessService {
* @return process instance
*/
@Override
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
return processInstanceMapper.queryLastSchedulerProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime());
dateInterval.getEndTime(),
testFlag);
}
/**
@ -2198,10 +2209,11 @@ public class ProcessServiceImpl implements ProcessService {
* @return process instance
*/
@Override
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
return processInstanceMapper.queryLastManualProcess(definitionCode,
dateInterval.getStartTime(),
dateInterval.getEndTime());
dateInterval.getEndTime(),
testFlag);
}
/**
@ -2213,10 +2225,11 @@ public class ProcessServiceImpl implements ProcessService {
* @return process instance
*/
@Override
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime, int testFlag) {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
endTime,
testFlag,
WorkflowExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
@ -3144,7 +3157,7 @@ public class ProcessServiceImpl implements ProcessService {
ProcessInstance processInstance = findProcessInstanceDetailById(task.getProcessInstanceId()).orElse(null);
if (processInstance != null
&& (processInstance.getState().isFailure() || processInstance.getState().isStop())) {
List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId());
List<TaskInstance> validTaskList = findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag());
List<Long> instanceTaskCodeList =
validTaskList.stream().map(TaskInstance::getTaskCode).collect(Collectors.toList());
List<ProcessTaskRelation> taskRelations = findRelationByCode(processInstance.getProcessDefinitionCode(),
@ -3166,4 +3179,11 @@ public class ProcessServiceImpl implements ProcessService {
}
}
}
@Override
public Integer queryTestDataSourceId(Integer onlineDataSourceId) {
Integer testDataSourceId = dataSourceMapper.queryTestDataSourceId(onlineDataSourceId);
if (testDataSourceId!=null) return testDataSourceId;
return null;
}
}

35
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java

@ -51,6 +51,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.CommandMapper;
import org.apache.dolphinscheduler.dao.mapper.DataSourceMapper;
import org.apache.dolphinscheduler.dao.mapper.DqComparisonTypeMapper;
import org.apache.dolphinscheduler.dao.mapper.DqExecuteResultMapper;
import org.apache.dolphinscheduler.dao.mapper.DqRuleExecuteSqlMapper;
@ -76,6 +77,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.InputType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OptionSourceType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ValueType;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.cron.CronUtilsTest;
import org.apache.dolphinscheduler.service.exceptions.CronParseException;
@ -148,6 +150,8 @@ public class ProcessServiceTest {
@Mock
private TaskGroupMapper taskGroupMapper;
@Mock
private DataSourceMapper dataSourceMapper;
@Mock
private TaskGroupQueueMapper taskGroupQueueMapper;
@Mock
@ -901,6 +905,37 @@ public class ProcessServiceTest {
Assert.assertEquals(0, commandList.size());
}
@Test
public void testFindLastManualProcessInterval() {
long definitionCode = 1L;
DateInterval dateInterval = new DateInterval(new Date(), new Date());
int testFlag = 1;
//find test lastManualProcessInterval
ProcessInstance lastManualProcessInterval = processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
Assert.assertEquals(null, lastManualProcessInterval);
//find online lastManualProcessInterval
testFlag = 0;
lastManualProcessInterval = processService.findLastManualProcessInterval(definitionCode, dateInterval, testFlag);
Assert.assertEquals(null, lastManualProcessInterval);
}
@Test
public void testQueryTestDataSourceId() {
Integer onlineDataSourceId = 1;
//unbound testDataSourceId
Mockito.when(dataSourceMapper.queryTestDataSourceId(any(Integer.class))).thenReturn(null);
Integer result = processService.queryTestDataSourceId(onlineDataSourceId);
Assert.assertNull(result);
//bound testDataSourceId
Integer testDataSourceId = 2;
Mockito.when(dataSourceMapper.queryTestDataSourceId(any(Integer.class))).thenReturn(testDataSourceId);
result = processService.queryTestDataSourceId(onlineDataSourceId);
Assert.assertNotNull(result);
}
private TaskGroupQueue getTaskGroupQueue() {
TaskGroupQueue taskGroupQueue = new TaskGroupQueue();
taskGroupQueue.setTaskName("task name");

5
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

@ -261,4 +261,9 @@ public class TaskExecutionContext implements Serializable {
* max memory
*/
private Integer memoryMax;
/**
* test flag
*/
private int testFlag;
}

12
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -23,12 +23,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.*;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
@ -91,6 +86,8 @@ public class SqlTask extends AbstractTask {
private SQLTaskExecutionContext sqlTaskExecutionContext;
public static final int TEST_FLAG_YES = 1;
/**
* Abstract Yarn Task
*
@ -102,6 +99,9 @@ public class SqlTask extends AbstractTask {
this.sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
assert sqlParameters != null;
if (taskExecutionContext.getTestFlag() == TEST_FLAG_YES && this.sqlParameters.getDatasource() == 0) {
throw new RuntimeException("unbound test data source");
}
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}

1
dolphinscheduler-ui/src/common/common.ts

@ -37,7 +37,6 @@ import {
import { format, parseISO } from 'date-fns'
import _ from 'lodash'
import { ITaskStateConfig } from './types'
/**
* Intelligent display kb m
*/

10
dolphinscheduler-ui/src/locales/en_US/datasource.ts

@ -26,15 +26,21 @@ export default {
datasource_parameter: 'Datasource Parameter',
description: 'Description',
description_tips: 'Please enter description',
test_datasource: 'Test data source',
online_datasource: 'Online data source',
bind_test_datasource: 'Bind test data source',
create_time: 'Create Time',
update_time: 'Update Time',
operation: 'Operation',
datasource_definition: 'Datasource Definition',
click_to_view: 'Click to view',
delete: 'Delete',
confirm: 'Confirm',
delete_confirm: 'Delete?',
cancel: 'Cancel',
create: 'Create',
on_line: 'Online',
test: 'Test',
edit: 'Edit',
success: 'Success',
test_connect: 'Test Connect',
@ -64,5 +70,7 @@ export default {
user_password_tips: 'Please enter your password',
aws_region: 'Aws Region',
aws_region_tips: 'Please enter AwsRegion',
jdbc_format_tips: 'jdbc connection parameters is not a correct JSON format'
jdbc_format_tips: 'jdbc connection parameters is not a correct JSON format',
datasource_test_flag_tips: 'Please select a data source definition',
datasource_bind_test_id_tips: 'Please bind the test data source'
}

7
dolphinscheduler-ui/src/locales/en_US/project.ts

@ -39,6 +39,9 @@ export default {
delete_confirm: 'Delete?'
},
workflow: {
on_line: 'Online',
test: 'Test',
operating_environment: 'Operating Environment',
workflow_relation: 'Workflow Relation',
create_workflow: 'Create Workflow',
import_workflow: 'Import Workflow',
@ -103,6 +106,7 @@ export default {
complement_data: 'Complement Data',
startup_parameter: 'Startup Parameter',
whether_dry_run: 'Whether Dry-Run',
whether_test: 'Whether Test',
continue: 'Continue',
end: 'End',
none_send: 'None',
@ -202,6 +206,9 @@ export default {
'There is not any workflows. Please create a workflow, and then visit this page again.'
},
task: {
on_line: 'Online',
test: 'Test',
operating_environment: 'Operating Environment',
cancel_full_screen: 'Cancel full screen',
enter_full_screen: 'Enter full screen',
current_task_settings: 'Current task settings',

10
dolphinscheduler-ui/src/locales/zh_CN/datasource.ts

@ -26,10 +26,16 @@ export default {
datasource_parameter: '数据源参数',
description: '描述',
description_tips: '请输入描述',
test_datasource: '测试数据源',
online_datasource: '上线数据源',
bind_test_datasource: '绑定测试数据源',
create_time: '创建时间',
update_time: '更新时间',
operation: '操作',
click_to_view: '点击查看',
datasource_definition: '数据源定义',
on_line: '线上',
test: '测试',
delete: '删除',
confirm: '确定',
delete_confirm: '删除?',
@ -61,5 +67,7 @@ export default {
user_password_tips: '请输入密码',
aws_region: 'AwsRegion',
aws_region_tips: '请输入AwsRegion',
jdbc_format_tips: 'jdbc连接参数不是一个正确的JSON格式'
jdbc_format_tips: 'jdbc连接参数不是一个正确的JSON格式',
datasource_test_flag_tips: '请选择数据源定义',
datasource_bind_test_id_tips: '请绑定测试数据源'
}

7
dolphinscheduler-ui/src/locales/zh_CN/project.ts

@ -39,6 +39,9 @@ export default {
delete_confirm: '确定删除吗?'
},
workflow: {
on_line: '线上',
test: '测试',
operating_environment: '运行环境',
workflow_relation: '工作流关系',
create_workflow: '创建工作流',
import_workflow: '导入工作流',
@ -105,6 +108,7 @@ export default {
complement_data: '补数',
startup_parameter: '启动参数',
whether_dry_run: '是否空跑',
whether_test: '是否测试',
continue: '继续',
end: '结束',
none_send: '都不发',
@ -203,6 +207,9 @@ export default {
'目前没有任何工作流,请先创建工作流,再访问该页面'
},
task: {
on_line: '线上',
test: '测试',
operating_environment: '运行环境',
cancel_full_screen: '取消全屏',
enter_full_screen: '全屏',
current_task_settings: '当前任务设置',

2
dolphinscheduler-ui/src/service/modules/data-source/types.ts

@ -45,6 +45,8 @@ interface IDataSource {
database?: string
connectType?: string
other?: object
testFlag?: number
bindTestId?: number
}
interface ListReq {

1
dolphinscheduler-ui/src/service/modules/process-instances/types.ts

@ -101,6 +101,7 @@ interface IWorkflowInstance {
count?: number
disabled?: boolean
buttonType?: string
testFlag: number
}
export {

45
dolphinscheduler-ui/src/utils/environmental-distinction.ts

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {h} from "vue";
import {NTag} from "naive-ui";
export function renderEnvironmentalDistinctionCell(
testFlag: number | undefined,
t: Function
) {
if (testFlag === 0) {
return h(
NTag,
{ type: 'success', size: 'small' },
{
default: () => t('datasource.on_line')
}
)
} else if (testFlag === 1) {
return h(
NTag,
{ type: 'warning', size: 'small' },
{
default: () => t('datasource.test')
}
)
} else {
return '-'
}
}

37
dolphinscheduler-ui/src/views/datasource/list/detail.tsx

@ -60,7 +60,9 @@ const DetailModal = defineComponent({
state,
changeType,
changePort,
changeTestFlag,
resetFieldsValue,
getSameTypeTestDataSource,
setFieldsValue,
getFieldsValue
} = useForm(props.id)
@ -89,6 +91,7 @@ const DetailModal = defineComponent({
const onChangeType = changeType
const onChangePort = changePort
const onChangeTestFlag = changeTestFlag
const trim = getCurrentInstance()?.appContext.config.globalProperties.trim
@ -102,6 +105,7 @@ const DetailModal = defineComponent({
datasourceType[state.detailForm.type]
))
props.show && props.id && setFieldsValue(await queryById(props.id))
props.show && state.detailForm.testFlag == 0 && await getSameTypeTestDataSource()
}
)
@ -110,6 +114,7 @@ const DetailModal = defineComponent({
...toRefs(state),
...toRefs(status),
onChangeType,
onChangeTestFlag,
onChangePort,
onSubmit,
onTest,
@ -134,6 +139,7 @@ const DetailModal = defineComponent({
saving,
testing,
onChangeType,
onChangeTestFlag,
onChangePort,
onCancel,
onTest,
@ -365,6 +371,37 @@ const DetailModal = defineComponent({
)}`}
/>
</NFormItem>
<NFormItem
label={t('datasource.datasource_definition')}
path='testFlag'
show-require-mark
>
<NRadioGroup
v-model={[detailForm.testFlag, 'value']}
onUpdate:value={onChangeTestFlag}
>
<NSpace>
<NRadio value={1} class='radio-test-datasource'>
{t('datasource.test_datasource')}
</NRadio>
<NRadio value={0} class='radio-online-datasource'>
{t('datasource.online_datasource')}
</NRadio>
</NSpace>
</NRadioGroup>
</NFormItem>
<NFormItem
v-show={detailForm.testFlag == 0}
label={t('datasource.bind_test_datasource')}
path='bindTestId'
show-require-mark
>
<NSelect
class='select-bind-test-data-source-type-drop-down'
v-model={[detailForm.bindTestId, 'value']}
options={this.bindTestDataSourceExample}
/>
</NFormItem>
</NForm>
</NSpin>
),

10
dolphinscheduler-ui/src/views/datasource/list/use-columns.ts

@ -28,12 +28,13 @@ import {
import { EditOutlined, DeleteOutlined } from '@vicons/antd'
import JsonHighlight from './json-highlight'
import ButtonLink from '@/components/button-link'
import type { IDataSource, TableColumns } from './types'
import {
COLUMN_WIDTH_CONFIG,
calculateTableWidth,
DefaultTableWidth
} from '@/common/column-width-config'
import type { TableColumns } from './types'
import {renderEnvironmentalDistinctionCell} from "@/utils/environmental-distinction";
export function useColumns(onCallback: Function) {
const { t } = useI18n()
@ -61,6 +62,13 @@ export function useColumns(onCallback: Function) {
key: 'type',
width: 180
},
{
title: t('datasource.datasource_definition'),
key: 'testFlag',
width: 140,
render: (_row: IDataSource) =>
renderEnvironmentalDistinctionCell(_row.testFlag, t)
},
{
title: t('datasource.datasource_parameter'),
key: 'parameter',

60
dolphinscheduler-ui/src/views/datasource/list/use-form.ts

@ -17,7 +17,10 @@
import { reactive, ref } from 'vue'
import { useI18n } from 'vue-i18n'
import { getKerberosStartupState } from '@/service/modules/data-source'
import {
getKerberosStartupState,
queryDataSourceList
} from '@/service/modules/data-source'
import type { FormRules } from 'naive-ui'
import type {
IDataSourceDetail,
@ -27,7 +30,7 @@ import type {
IDataSource
} from './types'
import utils from '@/utils'
import type { TypeReq } from '@/service/modules/data-source/types'
export function useForm(id?: number) {
const { t } = useI18n()
@ -45,7 +48,9 @@ export function useForm(id?: number) {
password: '',
database: '',
connectType: '',
other: ''
other: '',
testFlag: -1,
bindTestId: undefined
} as IDataSourceDetail
const state = reactive({
@ -57,6 +62,7 @@ export function useForm(id?: number) {
showAwsRegion: false,
showConnectType: false,
showPrincipal: false,
bindTestDataSourceExample: [] as { label: string; value: number }[],
rules: {
name: {
trigger: ['input'],
@ -129,6 +135,22 @@ export function useForm(id?: number) {
return new Error(t('datasource.jdbc_format_tips'))
}
}
},
testFlag: {
trigger: ['input'],
validator() {
if (-1 === state.detailForm.testFlag) {
return new Error(t('datasource.datasource_test_flag_tips'))
}
}
},
bindTestId: {
trigger: ['input'],
validator() {
if (0 === state.detailForm.testFlag && !state.detailForm.bindTestId) {
return new Error(t('datasource.datasource_bind_test_id_tips'))
}
}
}
} as FormRules
})
@ -153,6 +175,9 @@ export function useForm(id?: number) {
} else {
state.showPrincipal = false
}
if (state.detailForm.id === undefined) {
await getSameTypeTestDataSource()
}
}
const changePort = async () => {
@ -160,6 +185,31 @@ export function useForm(id?: number) {
const currentDataBaseOption = datasourceType[state.detailForm.type]
currentDataBaseOption.previousPort = state.detailForm.port
}
const changeTestFlag = async (testFlag: IDataBase) => {
if (testFlag) {
state.detailForm.bindTestId = undefined
}
// @ts-ignore
if (state.detailForm.id !== undefined && testFlag === 0) {
await getSameTypeTestDataSource()
}
}
const getSameTypeTestDataSource = async () => {
const params = { type: state.detailForm.type, testFlag: 1 } as TypeReq
const result = await queryDataSourceList(params)
state.bindTestDataSourceExample = result
.filter((value: { label: string; value: string }) => {
// @ts-ignore
if (state.detailForm.id && state.detailForm.id === value.id)
return false
return true
})
.map((TestDataSourceExample: { name: string; id: number }) => ({
label: TestDataSourceExample.name,
value: TestDataSourceExample.id
}))
}
const resetFieldsValue = () => {
state.detailForm = { ...initialValues }
@ -179,7 +229,9 @@ export function useForm(id?: number) {
state,
changeType,
changePort,
changeTestFlag,
resetFieldsValue,
getSameTypeTestDataSource,
setFieldsValue,
getFieldsValue
}
@ -248,4 +300,4 @@ export const datasourceTypeList: IDataBaseOption[] = Object.values(
).map((item) => {
item.class = 'options-datasource-type'
return item
})
})

6
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-datasource.ts

@ -29,6 +29,7 @@ export function useDatasource(
typeField?: string
sourceField?: string
span?: Ref | number
testFlag?: Ref | number
} = {}
): IJsonItem[] {
const { t } = useI18n()
@ -109,7 +110,10 @@ export function useDatasource(
}
const refreshOptions = async () => {
const parameters = { type: model[params.typeField || 'type'] } as TypeReq
const parameters = {
type: model[params.typeField || 'type'],
testFlag: 0
} as TypeReq
const res = await queryDataSourceList(parameters)
datasourceOptions.value = res.map((item: any) => ({
label: item.name,

5
dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-sqoop-datasource.ts

@ -19,7 +19,7 @@ import { onMounted, ref, Ref } from 'vue'
import { queryDataSourceList } from '@/service/modules/data-source'
import { useI18n } from 'vue-i18n'
import type { IJsonItem, IDataBase } from '../types'
import type { TypeReq } from '@/service/modules/data-source/types'
export function useDatasource(
model: { [field: string]: any },
span: Ref,
@ -33,7 +33,8 @@ export function useDatasource(
const getDataSource = async (type: IDataBase) => {
if (loading.value) return
loading.value = true
const result = await queryDataSourceList({ type })
const params = { type, testFlag: 0 } as TypeReq
const result = await queryDataSourceList(params)
dataSourceList.value = result.map((item: { name: string; id: number }) => ({
label: item.name,
value: item.id

1
dolphinscheduler-ui/src/views/projects/task/instance/types.ts

@ -32,6 +32,7 @@ interface IRecord {
retryTimes: number
dryRun: number
host: string
testFlag?: number
}
export { ITaskState, IRecord }

24
dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts

@ -23,7 +23,14 @@ import {
forceSuccess,
downloadLog
} from '@/service/modules/task-instances'
import { NButton, NIcon, NSpace, NTooltip, NSpin, NEllipsis } from 'naive-ui'
import {
NButton,
NIcon,
NSpace,
NTooltip,
NSpin,
NEllipsis
} from 'naive-ui'
import ButtonLink from '@/components/button-link'
import {
AlignLeftOutlined,
@ -32,13 +39,19 @@ import {
} from '@vicons/antd'
import { format } from 'date-fns'
import { useRoute, useRouter } from 'vue-router'
import { parseTime, renderTableTime, tasksState } from '@/common/common'
import {
parseTime,
renderTableTime,
tasksState
} from '@/common/common'
import {
COLUMN_WIDTH_CONFIG,
calculateTableWidth,
DefaultTableWidth
} from '@/common/column-width-config'
import type { Router, TaskInstancesRes, IRecord, ITaskState } from './types'
import {renderEnvironmentalDistinctionCell} from "@/utils/environmental-distinction";
export function useTable() {
const { t } = useI18n()
@ -117,6 +130,13 @@ export function useTable() {
key: 'executorName',
...COLUMN_WIDTH_CONFIG['name']
},
{
title: t('project.task.operating_environment'),
key: 'testFlag',
width: 160,
render: (_row: IRecord) =>
renderEnvironmentalDistinctionCell(_row.testFlag, t)
},
{
title: t('project.task.node_type'),
key: 'taskType',

7
dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx

@ -499,6 +499,13 @@ export default defineComponent({
v-model:value={this.startForm.dryRun}
/>
</NFormItem>
<NFormItem label={t('project.workflow.whether_test')} path='testFlag'>
<NSwitch
checkedValue={1}
uncheckedValue={0}
v-model:value={this.startForm.testFlag}
/>
</NFormItem>
</NForm>
</Modal>
)

3
dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts

@ -66,7 +66,8 @@ export const useForm = () => {
environmentCode: null,
startParams: null,
expectedParallelismNumber: '',
dryRun: 0
dryRun: 0,
testFlag: 0
},
saving: false,
rules: {

16
dolphinscheduler-ui/src/views/projects/workflow/instance/use-table.ts

@ -21,7 +21,7 @@ import { useI18n } from 'vue-i18n'
import { useRouter } from 'vue-router'
import ButtonLink from '@/components/button-link'
import { RowKey } from 'naive-ui/lib/data-table/src/interface'
import { NEllipsis } from 'naive-ui'
import { NEllipsis} from 'naive-ui'
import {
queryProcessInstanceListPaging,
deleteProcessInstanceById,
@ -29,7 +29,10 @@ import {
} from '@/service/modules/process-instances'
import { execute } from '@/service/modules/executors'
import TableAction from './components/table-action'
import { renderTableTime, runningType } from '@/common/common'
import {
renderTableTime,
runningType
} from '@/common/common'
import { renderStateCell } from '../../task/instance/use-table'
import {
COLUMN_WIDTH_CONFIG,
@ -40,6 +43,7 @@ import type { Router } from 'vue-router'
import type { IWorkflowInstance } from '@/service/modules/process-instances/types'
import type { ICountDownParam } from './types'
import type { ExecuteReq } from '@/service/modules/executors/types'
import {renderEnvironmentalDistinctionCell} from "@/utils/environmental-distinction";
export function useTable() {
const { t } = useI18n()
@ -109,6 +113,14 @@ export function useTable() {
className: 'workflow-status',
render: (_row: IWorkflowInstance) => renderStateCell(_row.state, t)
},
{
title: t('project.workflow.operating_environment'),
key: 'testFlag',
width: 160,
className: 'workflow-testFlag',
render: (_row: IWorkflowInstance) =>
renderEnvironmentalDistinctionCell(_row.testFlag, t)
},
{
title: t('project.workflow.run_type'),
key: 'commandType',

25
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/DefaultWorkerDelayTaskExecuteRunnableTest.java

@ -70,4 +70,29 @@ public class DefaultWorkerDelayTaskExecuteRunnableTest {
Assertions.assertEquals(TaskExecutionStatus.SUCCESS, taskExecutionContext.getCurrentExecutionStatus());
}
@Test
public void testErrorboundTestDataSource() {
TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder()
.dryRun(Constants.DRY_RUN_FLAG_NO)
.testFlag(Constants.TEST_FLAG_YES)
.taskInstanceId(0)
.processDefineId(0)
.firstSubmitTime(new Date())
.taskLogName("TestLogName")
.taskType("SQL")
.taskParams("{\"localParams\":[],\"resourceList\":[],\"type\":\"POSTGRESQL\",\"datasource\":null,\"sql\":\"select * from t_ds_user\",\"sqlType\":\"0\",\"preStatements\":[],\"postStatements\":[],\"segmentSeparator\":\"\",\"displayRows\":10,\"conditionResult\":\"null\",\"dependence\":\"null\",\"switchResult\":\"null\",\"waitStartTimeout\":null}")
.build();
WorkerTaskExecuteRunnable workerTaskExecuteRunnable = new DefaultWorkerDelayTaskExecuteRunnable(
taskExecutionContext,
workerConfig,
masterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate
);
Assertions.assertAll(workerTaskExecuteRunnable::run);
Assertions.assertEquals(TaskExecutionStatus.FAILURE, taskExecutionContext.getCurrentExecutionStatus());
}
}
Loading…
Cancel
Save